Skip to content

Commit

Permalink
[hotfix] [table] Fix IndexOOBE in DataStreamSortRule.
Browse files Browse the repository at this point in the history
This closes apache#5370.
  • Loading branch information
fhueske authored and twalthr committed Jan 31, 2018
1 parent 2cb5896 commit 2b76eca
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ class DataStreamSortRule
def checkTimeOrder(sort: FlinkLogicalSort): Boolean = {

val sortCollation = sort.collation
if (sortCollation.getFieldCollations.size() == 0) {
// no sort fields defined
return false
}
// get type of first sort field
val firstSortType = SortUtil.getFirstSortField(sortCollation, sort.getRowType).getType
// get direction of first sort field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.flink.table.api.TableException
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.util.Preconditions

import java.util.Comparator

Expand All @@ -60,6 +61,7 @@ object SortUtil {
inputTypeInfo: TypeInformation[Row],
execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = {

Preconditions.checkArgument(collationSort.getFieldCollations.size() > 0)
val rowtimeIdx = collationSort.getFieldCollations.get(0).getFieldIndex

val collectionRowComparator = if (collationSort.getFieldCollations.size() > 1) {
Expand Down Expand Up @@ -158,6 +160,7 @@ object SortUtil {
* @return The direction of the first sort field.
*/
def getFirstSortDirection(collationSort: RelCollation): Direction = {
Preconditions.checkArgument(collationSort.getFieldCollations.size() > 0)
collationSort.getFieldCollations.get(0).direction
}

Expand All @@ -169,6 +172,7 @@ object SortUtil {
* @return The first sort field.
*/
def getFirstSortField(collationSort: RelCollation, rowType: RelDataType): RelDataTypeField = {
Preconditions.checkArgument(collationSort.getFieldCollations.size() > 0)
val idx = collationSort.getFieldCollations.get(0).getFieldIndex
rowType.getFieldList.get(idx)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class SortTest extends TableTestBase {
'proctime.proctime, 'rowtime.rowtime)

@Test
def testSortProcessingTime() = {
def testSortProcessingTime(): Unit = {

val sqlQuery = "SELECT a FROM MyTable ORDER BY proctime, c"

Expand All @@ -47,7 +47,7 @@ class SortTest extends TableTestBase {
}

@Test
def testSortRowTime() = {
def testSortRowTime(): Unit = {

val sqlQuery = "SELECT a FROM MyTable ORDER BY rowtime, c"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class SortValidationTest extends TableTestBase {

// test should fail because time order is descending
@Test(expected = classOf[TableException])
def testSortProcessingTimeDesc() = {
def testSortProcessingTimeDesc(): Unit = {

val sqlQuery = "SELECT a FROM MyTable ORDER BY proctime DESC, c"
streamUtil.verifySql(sqlQuery, "")
Expand All @@ -41,9 +41,17 @@ class SortValidationTest extends TableTestBase {

// test should fail because time is not the primary order field
@Test(expected = classOf[TableException])
def testSortProcessingTimeSecondaryField() = {
def testSortProcessingTimeSecondaryField(): Unit = {

val sqlQuery = "SELECT a FROM MyTable ORDER BY c, proctime"
streamUtil.verifySql(sqlQuery, "")
}

// test should fail because LIMIT is not supported without sorting
@Test(expected = classOf[TableException])
def testLimitWithoutSorting(): Unit = {

val sqlQuery = "SELECT a FROM MyTable LIMIT 3"
streamUtil.verifySql(sqlQuery, "")
}
}

0 comments on commit 2b76eca

Please sign in to comment.