Skip to content

Commit

Permalink
[FLINK-7571] [table] Fix translation of TableSource with time indicat…
Browse files Browse the repository at this point in the history
…ors.

This closes apache#4635.
  • Loading branch information
fhueske committed Sep 21, 2017
1 parent 6c315be commit 2a4ac66
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableEnvironment}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.plan.schema.{RowSchema, StreamTableSourceTable}
import org.apache.flink.table.sources._
import org.apache.flink.table.plan.schema.TableSourceTable
import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.table.sources.{StreamTableSource, TableSource}
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
Expand Down Expand Up @@ -108,7 +107,7 @@ class StreamTableSourceScan(
convertToInternalRow(
new RowSchema(getRowType),
inputDataStream,
new TableSourceTable(tableSource),
new StreamTableSourceTable(tableSource),
config)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.plan.schema

import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.{TableEnvironment, TableException}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.stats.FlinkStatistic
Expand All @@ -28,48 +29,113 @@ import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
class StreamTableSourceTable[T](
override val tableSource: TableSource[T],
override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
extends TableSourceTable[T](tableSource, statistic) {

extends TableSourceTable[T](
tableSource,
StreamTableSourceTable.adjustFieldIndexes(tableSource),
StreamTableSourceTable.adjustFieldNames(tableSource),
statistic) {

override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
val fieldTypes = StreamTableSourceTable.adjustFieldTypes(tableSource)

val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
flinkTypeFactory.buildLogicalRowType(
this.fieldNames,
fieldTypes)
}

val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
}

val fields = fieldNames.zip(fieldTypes)
object StreamTableSourceTable {

private def adjustFieldIndexes(tableSource: TableSource[_]): Array[Int] = {
val (rowtime, proctime) = getTimeIndicators(tableSource)

val original = TableEnvironment.getFieldIndices(tableSource)

// append rowtime marker
val withRowtime = if (rowtime.isDefined) {
original :+ TimeIndicatorTypeInfo.ROWTIME_MARKER
} else {
original
}

val withRowtime = tableSource match {
// append proctime marker
if (proctime.isDefined) {
withRowtime :+ TimeIndicatorTypeInfo.PROCTIME_MARKER
} else {
withRowtime
}
}

private def adjustFieldNames(tableSource: TableSource[_]): Array[String] = {
val (rowtime, proctime) = getTimeIndicators(tableSource)

val original = TableEnvironment.getFieldNames(tableSource)

// append rowtime field
val withRowtime = if (rowtime.isDefined) {
original :+ rowtime.get
} else {
original
}

// append proctime field
if (proctime.isDefined) {
withRowtime :+ proctime.get
} else {
withRowtime
}
}

private def adjustFieldTypes(tableSource: TableSource[_]): Array[TypeInformation[_]] = {
val (rowtime, proctime) = StreamTableSourceTable.getTimeIndicators(tableSource)

val original = TableEnvironment.getFieldTypes(tableSource.getReturnType)

// append rowtime type
val withRowtime = if (rowtime.isDefined) {
original :+ TimeIndicatorTypeInfo.ROWTIME_INDICATOR
} else {
original
}

// append proctime type
val withProctime = if (proctime.isDefined) {
withRowtime :+ TimeIndicatorTypeInfo.PROCTIME_INDICATOR
} else {
withRowtime
}

withProctime.asInstanceOf[Array[TypeInformation[_]]]
}

private def getTimeIndicators(tableSource: TableSource[_]): (Option[String], Option[String]) = {

val rowtime: Option[String] = tableSource match {
case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute == null =>
fields
None
case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute.trim.equals("") =>
throw TableException("The name of the rowtime attribute must not be empty.")
case timeSource: DefinedRowtimeAttribute =>
val rowtimeAttribute = timeSource.getRowtimeAttribute
fields :+ (rowtimeAttribute, TimeIndicatorTypeInfo.ROWTIME_INDICATOR)
Some(rowtimeAttribute)
case _ =>
fields
None
}

val withProctime = tableSource match {
val proctime: Option[String] = tableSource match {
case timeSource : DefinedProctimeAttribute if timeSource.getProctimeAttribute == null =>
withRowtime
None
case timeSource: DefinedProctimeAttribute
if timeSource.getProctimeAttribute.trim.equals("") =>
throw TableException("The name of the rowtime attribute must not be empty.")
case timeSource: DefinedProctimeAttribute =>
val proctimeAttribute = timeSource.getProctimeAttribute
withRowtime :+ (proctimeAttribute, TimeIndicatorTypeInfo.PROCTIME_INDICATOR)
Some(proctimeAttribute)
case _ =>
withRowtime
None
}

val (fieldNamesWithIndicators, fieldTypesWithIndicators) = withProctime.unzip

flinkTypeFactory.buildLogicalRowType(
fieldNamesWithIndicators,
fieldTypesWithIndicators)

(rowtime, proctime)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,23 @@ import org.apache.flink.table.sources.TableSource
/** Table which defines an external table via a [[TableSource]] */
class TableSourceTable[T](
val tableSource: TableSource[T],
override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
fieldIndexes: Array[Int],
fieldNames: Array[String],
override val statistic: FlinkStatistic)
extends FlinkTable[T](
typeInfo = tableSource.getReturnType,
fieldIndexes = TableEnvironment.getFieldIndices(tableSource),
fieldNames = TableEnvironment.getFieldNames(tableSource),
statistic)
fieldIndexes,
fieldNames,
statistic) {

def this(
tableSource: TableSource[T],
statistic: FlinkStatistic = FlinkStatistic.UNKNOWN) {

this(
tableSource,
TableEnvironment.getFieldIndices(tableSource),
TableEnvironment.getFieldNames(tableSource),
statistic)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,16 @@
package org.apache.flink.table.runtime.stream

import java.math.BigDecimal
import java.lang.{Long => JLong, Integer => JInt}

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecEnv}
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
Expand All @@ -33,11 +39,13 @@ import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.expressions.{ExpressionParser, TimeIntervalUnit}
import org.apache.flink.table.runtime.stream.TimeAttributesITCase.{TestPojo, TimestampWithEqualWatermark, TimestampWithEqualWatermarkPojo}
import org.apache.flink.table.runtime.utils.StreamITCase
import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, StreamTableSource}
import org.apache.flink.types.Row
import org.junit.Assert._
import org.junit.Test

import scala.collection.mutable
import scala.collection.JavaConverters._

/**
* Tests for access and materialization of time attributes.
Expand Down Expand Up @@ -369,6 +377,32 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
"1970-01-01 00:00:00.043,And me.,13")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}

@Test
def testTableSourceWithTimeIndicators(): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
tEnv.registerTableSource("testTable", new TestTableSource)
StreamITCase.clear

val result = tEnv
.scan("testTable")
.where('a % 2 === 1)
.select('rowtime, 'a, 'b, 'c)
.toAppendStream[Row]

result.addSink(new StreamITCase.StringSink[Row])
env.execute()

val expected = Seq(
"1970-01-01 00:00:01.0,1,A,1000",
"1970-01-01 00:00:03.0,3,C,3000",
"1970-01-01 00:00:05.0,5,E,5000")

assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
}

object TimeAttributesITCase {
Expand Down Expand Up @@ -413,3 +447,40 @@ object TimeAttributesITCase {
var c: String = _
}
}

class TestTableSource
extends StreamTableSource[Row]
with DefinedRowtimeAttribute
with DefinedProctimeAttribute {

override def getDataStream(env: JStreamExecEnv): DataStream[Row] = {

def toRow(i: Int, s: String, l: Long) = Row.of(i.asInstanceOf[JInt], s, l.asInstanceOf[JLong])

val rows = Seq(
toRow(1, "A", 1000L),
toRow(2, "B", 2000L),
toRow(3, "C", 3000L),
toRow(4, "D", 4000L),
toRow(5, "E", 5000L),
toRow(6, "F", 6000L)
)

env
.fromCollection(rows.asJava).returns(getReturnType)
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Row] {
override def extractAscendingTimestamp(r: Row): Long = r.getField(2).asInstanceOf[Long]
})
}

override def getRowtimeAttribute: String = "rowtime"

override def getProctimeAttribute: String = "proctime"

override def getReturnType: TypeInformation[Row] = {
new RowTypeInfo(
Array(Types.INT, Types.STRING, Types.LONG).asInstanceOf[Array[TypeInformation[_]]],
Array("a", "b", "c")
)
}
}

0 comments on commit 2a4ac66

Please sign in to comment.