Skip to content

Commit

Permalink
[FLINK-8096] [table] Fix time attribute materialization when writing …
Browse files Browse the repository at this point in the history
…to TableSink

This closes apache#5025.
  • Loading branch information
dianfu authored and twalthr committed Nov 20, 2017
1 parent 51b5b53 commit 691c48a
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,12 @@ abstract class StreamTableEnvironment(
"UpsertStreamTableSink requires that Table has a full primary keys if it is updated.")
}
val outputType = sink.getOutputType
val resultType = getResultType(table.getRelNode, optimizedPlan)
// translate the Table into a DataStream and provide the type that the TableSink expects.
val result: DataStream[T] =
translate(
optimizedPlan,
table.getRelNode.getRowType,
resultType,
streamQueryConfig,
withChangeFlag = true)(outputType)
// Give the DataStream to the TableSink to emit it.
Expand All @@ -254,11 +255,12 @@ abstract class StreamTableEnvironment(
"AppendStreamTableSink requires that Table has only insert changes.")
}
val outputType = sink.getOutputType
val resultType = getResultType(table.getRelNode, optimizedPlan)
// translate the Table into a DataStream and provide the type that the TableSink expects.
val result: DataStream[T] =
translate(
optimizedPlan,
table.getRelNode.getRowType,
resultType,
streamQueryConfig,
withChangeFlag = false)(outputType)
// Give the DataStream to the TableSink to emit it.
Expand Down Expand Up @@ -727,19 +729,7 @@ abstract class StreamTableEnvironment(
val relNode = table.getRelNode
val dataStreamPlan = optimize(relNode, updatesAsRetraction)

// zip original field names with optimized field types
val fieldTypes = relNode.getRowType.getFieldList.asScala
.zip(dataStreamPlan.getRowType.getFieldList.asScala)
// get name of original plan and type of optimized plan
.map(x => (x._1.getName, x._2.getType))
// add field indexes
.zipWithIndex
// build new field types
.map(x => new RelDataTypeFieldImpl(x._1._1, x._2, x._1._2))

// build a record type from list of field types
val rowType = new RelRecordType(
fieldTypes.toList.asInstanceOf[List[RelDataTypeField]].asJava)
val rowType = getResultType(relNode, dataStreamPlan)

translate(dataStreamPlan, rowType, queryConfig, withChangeFlag)
}
Expand Down Expand Up @@ -851,6 +841,25 @@ abstract class StreamTableEnvironment(
}
}

/**
* Returns the record type of the optimized plan with field names of the logical plan.
*/
private def getResultType(originRelNode: RelNode, optimizedPlan: RelNode): RelRecordType = {
// zip original field names with optimized field types
val fieldTypes = originRelNode.getRowType.getFieldList.asScala
.zip(optimizedPlan.getRowType.getFieldList.asScala)
// get name of original plan and type of optimized plan
.map(x => (x._1.getName, x._2.getType))
// add field indexes
.zipWithIndex
// build new field types
.map(x => new RelDataTypeFieldImpl(x._1._1, x._2, x._1._2))

// build a record type from list of field types
new RelRecordType(
fieldTypes.toList.asInstanceOf[List[RelDataTypeField]].asJava)
}

/**
* Returns the AST of the specified Table API and SQL queries and the execution plan to compute
* the result of the given [[Table]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.flink.table.api.{TableEnvironment, TableSchema, Types}
import org.apache.flink.table.expressions.{ExpressionParser, TimeIntervalUnit}
import org.apache.flink.table.runtime.stream.TimeAttributesITCase.{AtomicTimestampWithEqualWatermark, TestPojo, TimestampWithEqualWatermark, TimestampWithEqualWatermarkPojo}
import org.apache.flink.table.runtime.utils.StreamITCase
import org.apache.flink.table.utils.TestTableSourceWithTime
import org.apache.flink.table.utils.{MemoryTableSinkUtil, TestTableSourceWithTime}
import org.apache.flink.types.Row
import org.junit.Assert._
import org.junit.Test
Expand Down Expand Up @@ -178,6 +178,30 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}

@Test
def testTableSink(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
MemoryTableSinkUtil.clear

val stream = env
.fromCollection(data)
.assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
.filter('rowtime.cast(Types.LONG) > 4)
.select('rowtime, 'rowtime.floor(TimeIntervalUnit.DAY), 'rowtime.ceil(TimeIntervalUnit.DAY))
.writeToSink(new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink)

env.execute()

val expected = Seq(
"1970-01-01 00:00:00.007,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0",
"1970-01-01 00:00:00.008,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0",
"1970-01-01 00:00:00.016,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0")
assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
}

@Test
def testTableFunction(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
Expand Down

0 comments on commit 691c48a

Please sign in to comment.