Skip to content

Commit

Permalink
[FLINK-6602] [table] Prevent TableSources with empty time attribute n…
Browse files Browse the repository at this point in the history
…ames.

This closes apache#4135.
  • Loading branch information
zhe li authored and fhueske committed Jun 19, 2017
1 parent d78eeca commit 850e4d9
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,27 @@ class StreamTableSourceTable[T](
val fieldCnt = fieldNames.length

val rowtime = tableSource match {
case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null =>
case nullTimeSource : DefinedRowtimeAttribute
if nullTimeSource.getRowtimeAttribute == null =>
None
case emptyStringTimeSource: DefinedRowtimeAttribute
if emptyStringTimeSource.getRowtimeAttribute.trim.equals("") =>
throw TableException("The name of the rowtime attribute must not be empty.")
case timeSource: DefinedRowtimeAttribute =>
val rowtimeAttribute = timeSource.getRowtimeAttribute
Some((fieldCnt, rowtimeAttribute))
case _ =>
None
}

val proctime = tableSource match {
case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null =>
case nullTimeSource : DefinedProctimeAttribute
if nullTimeSource.getProctimeAttribute == null =>
None
case emptyStringTimeSource: DefinedProctimeAttribute
if emptyStringTimeSource.getProctimeAttribute.trim.equals("") =>
throw TableException("The name of the proctime attribute must not be empty.")
case timeSource: DefinedProctimeAttribute =>
val proctimeAttribute = timeSource.getProctimeAttribute
Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute))
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.Types
import org.apache.flink.table.api.{TableException, Types}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, StreamTableSource}
import org.apache.flink.table.utils.TableTestBase
Expand Down Expand Up @@ -121,6 +121,28 @@ class TableSourceTest extends TableTestBase {
)
util.verifyTable(t, expected)
}

@Test(expected = classOf[TableException])
def testRowtimeTableSourceWithEmptyName(): Unit = {
val util = streamTestUtil()
util.tEnv.registerTableSource("rowTimeT", new TestRowtimeSource(" "))

val t = util.tEnv.scan("rowTimeT")
.select('id)

util.tEnv.optimize(t.getRelNode, false)
}

@Test(expected = classOf[TableException])
def testProctimeTableSourceWithEmptyName(): Unit = {
val util = streamTestUtil()
util.tEnv.registerTableSource("procTimeT", new TestProctimeSource(" "))

val t = util.tEnv.scan("procTimeT")
.select('id)

util.tEnv.optimize(t.getRelNode, false)
}
}

class TestRowtimeSource(timeField: String)
Expand Down

0 comments on commit 850e4d9

Please sign in to comment.