diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala index 0430e497d6bfd..7349a0e1ae5ad 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala @@ -848,7 +848,12 @@ class Table( val rowType = getRelNode.getRowType val fieldNames: Array[String] = rowType.getFieldNames.asScala.toArray val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala - .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray + .map(field => FlinkTypeFactory.toTypeInfo(field.getType)) + .map { + // replace time indicator types by SQL_TIMESTAMP + case t: TypeInformation[_] if FlinkTypeFactory.isTimeIndicatorType(t) => Types.SQL_TIMESTAMP + case t: TypeInformation[_] => t + }.toArray // configure the table sink val configuredSink = sink.configure(fieldNames, fieldTypes) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala index 07934b89c0adf..b44d8eff90c9e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala @@ -88,13 +88,16 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) env.setParallelism(4) val input = StreamTestData.get3TupleDataStream(env) + .assignAscendingTimestamps(_._2) .map(x => x).setParallelism(4) // increase DOP to 4 - val results = input.toTable(tEnv, 'a, 'b, 'c) + val results = input.toTable(tEnv, 'a, 'b.rowtime, 'c) .where('a < 5 || 'a > 17) .select('c, 'b) .writeToSink(new CsvTableSink(path)) @@ -102,8 +105,14 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase { env.execute() val expected = Seq( - "Hi,1", "Hello,2", "Hello world,2", "Hello world, how are you?,3", - "Comment#12,6", "Comment#13,6", "Comment#14,6", "Comment#15,6").mkString("\n") + "Hi,1970-01-01 00:00:00.001", + "Hello,1970-01-01 00:00:00.002", + "Hello world,1970-01-01 00:00:00.002", + "Hello world, how are you?,1970-01-01 00:00:00.003", + "Comment#12,1970-01-01 00:00:00.006", + "Comment#13,1970-01-01 00:00:00.006", + "Comment#14,1970-01-01 00:00:00.006", + "Comment#15,1970-01-01 00:00:00.006").mkString("\n") TestBaseUtils.compareResultsByLinesInMemory(expected, path) }