Skip to content

Commit

Permalink
[FLINK-8012] [table] Fix TableSink config for tables with time attrib…
Browse files Browse the repository at this point in the history
…utes.

This closes apache#4974.
  • Loading branch information
fhueske committed Nov 8, 2017
1 parent 1ad1830 commit 8c5b547
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,12 @@ class Table(
val rowType = getRelNode.getRowType
val fieldNames: Array[String] = rowType.getFieldNames.asScala.toArray
val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
.map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray
.map(field => FlinkTypeFactory.toTypeInfo(field.getType))
.map {
// replace time indicator types by SQL_TIMESTAMP
case t: TypeInformation[_] if FlinkTypeFactory.isTimeIndicatorType(t) => Types.SQL_TIMESTAMP
case t: TypeInformation[_] => t
}.toArray

// configure the table sink
val configuredSink = sink.configure(fieldNames, fieldTypes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,22 +88,31 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val tEnv = TableEnvironment.getTableEnvironment(env)
env.setParallelism(4)

val input = StreamTestData.get3TupleDataStream(env)
.assignAscendingTimestamps(_._2)
.map(x => x).setParallelism(4) // increase DOP to 4

val results = input.toTable(tEnv, 'a, 'b, 'c)
val results = input.toTable(tEnv, 'a, 'b.rowtime, 'c)
.where('a < 5 || 'a > 17)
.select('c, 'b)
.writeToSink(new CsvTableSink(path))

env.execute()

val expected = Seq(
"Hi,1", "Hello,2", "Hello world,2", "Hello world, how are you?,3",
"Comment#12,6", "Comment#13,6", "Comment#14,6", "Comment#15,6").mkString("\n")
"Hi,1970-01-01 00:00:00.001",
"Hello,1970-01-01 00:00:00.002",
"Hello world,1970-01-01 00:00:00.002",
"Hello world, how are you?,1970-01-01 00:00:00.003",
"Comment#12,1970-01-01 00:00:00.006",
"Comment#13,1970-01-01 00:00:00.006",
"Comment#14,1970-01-01 00:00:00.006",
"Comment#15,1970-01-01 00:00:00.006").mkString("\n")

TestBaseUtils.compareResultsByLinesInMemory(expected, path)
}
Expand Down

0 comments on commit 8c5b547

Please sign in to comment.