Skip to content

Commit

Permalink
[FLINK-6483] [table] Add materialization of time indicators.
Browse files Browse the repository at this point in the history
This closes apache#3862.
  • Loading branch information
twalthr authored and fhueske committed May 12, 2017
1 parent 2480887 commit b50ef4b
Show file tree
Hide file tree
Showing 23 changed files with 1,078 additions and 233 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,37 @@

package org.apache.flink.table.api

import _root_.java.util.concurrent.atomic.AtomicInteger
import _root_.java.lang.{Boolean => JBool}
import _root_.java.util.concurrent.atomic.AtomicInteger

import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.plan.hep.HepMatchOrder
import org.apache.calcite.rel.{RelNode, RelVisitor}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{RelNode, RelVisitor}
import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
import org.apache.calcite.sql.SqlKind
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.{RuleSet, RuleSets}
import org.apache.flink.api.common.typeinfo.AtomicType
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.calcite.RelTimeIndicatorConverter
import org.apache.flink.table.explain.PlanJsonParser
import org.apache.flink.table.expressions.{Expression, ProctimeAttribute, RowtimeAttribute, UnresolvedFieldReference}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait}
import org.apache.flink.table.plan.nodes.datastream._
import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait, _}
import org.apache.flink.table.plan.rules.FlinkRuleSets
import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable}
import org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, CRowInputMapRunner, CRowInputScalaTupleOutputMapRunner}
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, CRowInputMapRunner, CRowInputScalaTupleOutputMapRunner}
import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink}
import org.apache.flink.table.sources.{StreamTableSource, TableSource}
import org.apache.flink.table.sources.{DefinedRowtimeAttribute, StreamTableSource, TableSource}
import org.apache.flink.table.typeutils.TypeCheckUtils
import org.apache.flink.types.Row

Expand Down Expand Up @@ -111,6 +110,17 @@ abstract class StreamTableEnvironment(
override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = {
checkValidTableName(name)

// check if event-time is enabled
tableSource match {
case dra: DefinedRowtimeAttribute if
execEnv.getStreamTimeCharacteristic != TimeCharacteristic.EventTime =>

throw TableException(
s"A rowtime attribute requires an EventTime time characteristic in stream environment. " +
s"But is: ${execEnv.getStreamTimeCharacteristic}")
case _ => // ok
}

tableSource match {
case streamTableSource: StreamTableSource[_] =>
registerTableInternal(name, new StreamTableSourceTable(streamTableSource))
Expand Down Expand Up @@ -390,6 +400,13 @@ abstract class StreamTableEnvironment(
// validate and extract time attributes
val (rowtime, proctime) = validateAndExtractTimeAttributes(streamType, fields)

// check if event-time is enabled
if (rowtime.isDefined && execEnv.getStreamTimeCharacteristic != TimeCharacteristic.EventTime) {
throw TableException(
s"A rowtime attribute requires an EventTime time characteristic in stream environment. " +
s"But is: ${execEnv.getStreamTimeCharacteristic}")
}

val dataStreamTable = new DataStreamTable[T](
dataStream,
fieldIndexes,
Expand Down Expand Up @@ -518,9 +535,9 @@ abstract class StreamTableEnvironment(
// 3. normalize the logical plan
val normRuleSet = getNormRuleSet
val normalizedPlan = if (normRuleSet.iterator().hasNext) {
runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, decorPlan, decorPlan.getTraitSet)
runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, convPlan, convPlan.getTraitSet)
} else {
decorPlan
convPlan
}

// 4. optimize the logical Flink plan
Expand Down
Loading

0 comments on commit b50ef4b

Please sign in to comment.