Skip to content

Commit

Permalink
[FLINK-8094] [table] Extend ExistingField rowtime extractor to suppor…
Browse files Browse the repository at this point in the history
…t ISO date strings.

This closes apache#6253.

- This patch proposes improvement of ExistingField which handles ISO dateformatted
  String type as well as Long and Timestamp types.
- Add test code to cover ExistingField's new behavior.
  The credit for test code should go to @xccui, since I copied the test method from below commit:
  xccui@afcc5f1
- Document new behavior of ExistingField to sourceSinks.md.
  • Loading branch information
HeartSaVioR authored and fhueske committed Jul 5, 2018
1 parent 5cb080c commit cc59535
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 14 deletions.
3 changes: 2 additions & 1 deletion docs/dev/table/sourceSinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,8 @@ val source: KafkaTableSource = Kafka010JsonTableSource.builder()
Flink provides `TimestampExtractor` implementations for common use cases.
The following `TimestampExtractor` implementations are currently available:

* `ExistingField(fieldName)`: Extracts the value of a rowtime attribute from an existing `LONG` or `SQL_TIMESTAMP` field.
* `ExistingField(fieldName)`: Extracts the value of a rowtime attribute from an existing `LONG` or `SQL_TIMESTAMP`, or ISO date formatted `STRING` field.
* One example of ISO date format would be '2018-05-28 12:34:56.000'.
* `StreamRecordTimestamp()`: Extracts the value of a rowtime attribute from the timestamp of the `DataStream` `StreamRecord`. Note, this `TimestampExtractor` is not available for batch table sources.

A custom `TimestampExtractor` can be defined by implementing the corresponding interface.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@

package org.apache.flink.table.sources.tsextractors

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.table.api.{Types, ValidationException}
import org.apache.flink.table.expressions.{Cast, Expression, ResolvedFieldReference}

/**
* Converts an existing [[Long]] or [[java.sql.Timestamp]] field into a rowtime attribute.
* Converts an existing [[Long]] or [[java.sql.Timestamp]], or
* ISO date formatted [[java.lang.String]] field into a rowtime attribute.
*
* @param field The field to convert into a rowtime attribute.
*/
Expand All @@ -32,27 +33,24 @@ final class ExistingField(val field: String) extends TimestampExtractor {
override def getArgumentFields: Array[String] = Array(field)

@throws[ValidationException]
override def validateArgumentFields(physicalFieldTypes: Array[TypeInformation[_]]): Unit = {
override def validateArgumentFields(argumentFieldTypes: Array[TypeInformation[_]]): Unit = {
val fieldType = argumentFieldTypes(0)

// get type of field to convert
val fieldType = physicalFieldTypes(0)

// check that the field to convert is of type Long or Timestamp
fieldType match {
case Types.LONG => // OK
case Types.SQL_TIMESTAMP => // OK
case Types.STRING => // OK
case _: TypeInformation[_] =>
throw ValidationException(
s"Field '$field' must be of type Long or Timestamp but is of type $fieldType.")
s"Field '$field' must be of type Long or Timestamp or String but is of type $fieldType.")
}
}

/**
* Returns an [[Expression]] that casts a [[Long]] or [[java.sql.Timestamp]] field into a
* rowtime attribute.
* Returns an [[Expression]] that casts a [[Long]] or [[java.sql.Timestamp]], or
* ISO date formatted [[java.lang.String]] field into a rowtime attribute.
*/
def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = {

override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = {
val fieldAccess: Expression = fieldAccesses(0)

fieldAccess.resultType match {
Expand All @@ -62,6 +60,8 @@ final class ExistingField(val field: String) extends TimestampExtractor {
case Types.SQL_TIMESTAMP =>
// cast timestamp to long
Cast(fieldAccess, Types.LONG)
case Types.STRING =>
Cast(Cast(fieldAccess, SqlTimeTypeInfo.TIMESTAMP), Types.LONG)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ class TableSourceValidationTest {
val schema = new TableSchema(
fieldNames,
Array(Types.LONG, Types.SQL_TIMESTAMP, Types.INT))
val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row](), rowtime = "name")
val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row](), rowtime = "amount")

// should fail because configured rowtime field is not of type Long or Timestamp
tEnv.registerTableSource("testTable", ts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,42 @@ class TableSourceITCase extends AbstractTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}

@Test
def testRowtimeStringTableSource(): Unit = {
StreamITCase.testResults = mutable.MutableList()
val tableName = "MyTable"
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)

val data = Seq(
"1970-01-01 00:00:00",
"1970-01-01 00:00:01",
"1970-01-01 00:00:01",
"1970-01-01 00:00:02",
"1970-01-01 00:00:04")

val schema = new TableSchema(Array("rtime"), Array(Types.SQL_TIMESTAMP))
val returnType = Types.STRING

val tableSource = new TestTableSourceWithTime(schema, returnType, data, "rtime", null)
tEnv.registerTableSource(tableName, tableSource)

tEnv.scan(tableName)
.window(Tumble over 1.second on 'rtime as 'w)
.groupBy('w)
.select('w.start, 1.count)
.addSink(new StreamITCase.StringSink[Row])
env.execute()

val expected = Seq(
"1970-01-01 00:00:00.0,1",
"1970-01-01 00:00:01.0,2",
"1970-01-01 00:00:02.0,1",
"1970-01-01 00:00:04.0,1")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}

@Test
def testProctimeStringTableSource(): Unit = {
StreamITCase.testResults = mutable.MutableList()
Expand Down Expand Up @@ -741,4 +777,5 @@ class TableSourceITCase extends AbstractTestBase {
val expected = Seq("(1,A,1)", "(6,C,10)", "(6,D,20)")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}

}

0 comments on commit cc59535

Please sign in to comment.