Skip to content

Commit

Permalink
[FLINK-12374][table-planner-blink] Support translation from StreamExe…
Browse files Browse the repository at this point in the history
…cTableSourceScan/BatchExecTableSourceScan to StreamTransformation. (apache#8407)
  • Loading branch information
beyond1920 authored and KurtYoung committed May 14, 2019
1 parent f919b23 commit 2826ff8
Show file tree
Hide file tree
Showing 13 changed files with 1,254 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.expressions

import org.apache.flink.api.common.typeinfo.TypeInformation

case class PlannerResolvedFieldReference(
name: String,
resultType: TypeInformation[_],
fieldIndex: Int) extends ResolvedFieldReference
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.table.plan.nodes.logical

import org.apache.flink.table.api.TableException
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableSourceScan.isTableSourceScan
Expand Down Expand Up @@ -49,7 +48,9 @@ class FlinkLogicalTableSourceScan(
extends TableScan(cluster, traitSet, relOptTable)
with FlinkLogicalRel {

val tableSource: TableSource[_] = relOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource
lazy val tableSource: TableSource[_] = tableSourceTable.tableSource

private lazy val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]])

def copy(
traitSet: RelTraitSet,
Expand All @@ -63,15 +64,7 @@ class FlinkLogicalTableSourceScan(

override def deriveRowType(): RelDataType = {
val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]

tableSource match {
case s: StreamTableSource[_] =>
TableSourceUtil.getRelDataType(s, None, streaming = true, flinkTypeFactory)
case _: BatchTableSource[_] =>
flinkTypeFactory.buildLogicalRowType(
tableSource.getTableSchema, isStreaming = Option.apply(false))
case _ => throw new TableException("Unknown TableSource type.")
}
tableSourceTable.getRowType(flinkTypeFactory)
}

override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,23 @@

package org.apache.flink.table.plan.nodes.physical.batch

import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.runtime.operators.DamBehavior
import org.apache.flink.streaming.api.transformations.StreamTransformation
import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
import org.apache.flink.table.api.{BatchTableEnvironment, TableException, Types}
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
import org.apache.flink.table.plan.nodes.physical.PhysicalTableSourceScan
import org.apache.flink.table.plan.schema.FlinkRelOptTable
import org.apache.flink.table.sources.BatchTableSource
import org.apache.flink.table.sources.{BatchTableSource, TableSourceUtil}
import org.apache.flink.table.`type`.TypeConverters.createInternalTypeFromTypeInfo
import org.apache.flink.table.codegen.CodeGeneratorContext
import org.apache.flink.table.plan.util.ScanUtil

import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rex.RexNode

import java.util

Expand Down Expand Up @@ -75,7 +80,58 @@ class BatchExecTableSourceScan(

override def translateToPlanInternal(
tableEnv: BatchTableEnvironment): StreamTransformation[BaseRow] = {
throw new TableException("Implements this")
val config = tableEnv.getConfig
val bts = tableSource.asInstanceOf[BatchTableSource[_]]
val inputTransform = bts.getBoundedStream(tableEnv.streamEnv).getTransformation

val fieldIndexes = TableSourceUtil.computeIndexMapping(
tableSource,
isStreamTable = false,
None)

// check that declared and actual type of table source DataStream are identical
if (createInternalTypeFromTypeInfo(inputTransform.getOutputType) !=
createInternalTypeFromTypeInfo(tableSource.getReturnType)) {
throw new TableException(s"TableSource of type ${tableSource.getClass.getCanonicalName} " +
s"returned a DataSet of type ${inputTransform.getOutputType} that does not match with " +
s"the type ${tableSource.getReturnType} declared by the TableSource.getReturnType() " +
s"method. Please validate the implementation of the TableSource.")
}

// get expression to extract rowtime attribute
val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeExtractionExpression(
tableSource,
None,
cluster,
tableEnv.getRelBuilder,
Types.SQL_TIMESTAMP
)
if (needInternalConversion) {
ScanUtil.convertToInternalRow(
CodeGeneratorContext(config),
inputTransform.asInstanceOf[StreamTransformation[Any]],
fieldIndexes,
tableSource.getReturnType,
getRowType,
getTable.getQualifiedName,
config,
rowtimeExpression)
} else {
inputTransform.asInstanceOf[StreamTransformation[BaseRow]]
}

}

def needInternalConversion: Boolean = {
val fieldIndexes = TableSourceUtil.computeIndexMapping(
tableSource,
isStreamTable = false,
None)
ScanUtil.hasTimeAttributeField(fieldIndexes) ||
ScanUtil.needsConversion(
tableSource.getReturnType,
TypeExtractor.createTypeInfo(
tableSource, classOf[BatchTableSource[_]], tableSource.getClass, 0)
.getTypeClass.asInstanceOf[Class[_]])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,28 @@

package org.apache.flink.table.plan.nodes.physical.stream

import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks}
import org.apache.flink.streaming.api.transformations.StreamTransformation
import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.{StreamTableEnvironment, TableException, Types}
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
import org.apache.flink.table.plan.nodes.physical.PhysicalTableSourceScan
import org.apache.flink.table.plan.schema.FlinkRelOptTable
import org.apache.flink.table.sources.StreamTableSource
import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, PreserveWatermarks, PunctuatedWatermarkAssigner}
import org.apache.flink.table.sources.{RowtimeAttributeDescriptor, StreamTableSource, TableSourceUtil}
import org.apache.flink.table.`type`.TypeConverters.createInternalTypeFromTypeInfo
import org.apache.flink.table.codegen.CodeGeneratorContext
import org.apache.flink.table.codegen.OperatorCodeGenerator._
import org.apache.flink.table.plan.util.ScanUtil
import org.apache.flink.table.runtime.AbstractProcessStreamOperator

import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rex.RexNode

import java.util

Expand Down Expand Up @@ -79,6 +90,137 @@ class StreamExecTableSourceScan(

override protected def translateToPlanInternal(
tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = {
throw new TableException("Implements this")
val config = tableEnv.getConfig
val sts = tableSource.asInstanceOf[StreamTableSource[_]]
val inputTransform = sts.getDataStream(tableEnv.execEnv).getTransformation

val fieldIndexes = TableSourceUtil.computeIndexMapping(
tableSource,
isStreamTable = true,
None)

// check that declared and actual type of table source DataStream are identical
if (createInternalTypeFromTypeInfo(inputTransform.getOutputType) !=
createInternalTypeFromTypeInfo(tableSource.getReturnType)) {
throw new TableException(s"TableSource of type ${tableSource.getClass.getCanonicalName} " +
s"returned a DataStream of type ${inputTransform.getOutputType} that does not match with " +
s"the type ${tableSource.getReturnType} declared by the TableSource.getReturnType() " +
s"method. Please validate the implementation of the TableSource.")
}

// get expression to extract rowtime attribute
val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeExtractionExpression(
tableSource,
None,
cluster,
tableEnv.getRelBuilder,
Types.SQL_TIMESTAMP
)

val streamTransformation = if (needInternalConversion) {
// extract time if the index is -1 or -2.
val (extractElement, resetElement) =
if (ScanUtil.hasTimeAttributeField(fieldIndexes)) {
(s"ctx.$ELEMENT = $ELEMENT;", s"ctx.$ELEMENT = null;")
} else {
("", "")
}
val ctx = CodeGeneratorContext(config).setOperatorBaseClass(
classOf[AbstractProcessStreamOperator[BaseRow]])
ScanUtil.convertToInternalRow(
ctx,
inputTransform.asInstanceOf[StreamTransformation[Any]],
fieldIndexes,
tableSource.getReturnType,
getRowType,
getTable.getQualifiedName,
config,
rowtimeExpression,
beforeConvert = extractElement,
afterConvert = resetElement)
} else {
inputTransform.asInstanceOf[StreamTransformation[BaseRow]]
}

val ingestedTable = new DataStream(tableEnv.execEnv, streamTransformation)

// generate watermarks for rowtime indicator
val rowtimeDesc: Option[RowtimeAttributeDescriptor] =
TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, None)

val withWatermarks = if (rowtimeDesc.isDefined) {
val rowtimeFieldIdx = getRowType.getFieldNames.indexOf(rowtimeDesc.get.getAttributeName)
val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy
watermarkStrategy match {
case p: PeriodicWatermarkAssigner =>
val watermarkGenerator = new PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p)
ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
case p: PunctuatedWatermarkAssigner =>
val watermarkGenerator = new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p)
ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
case _: PreserveWatermarks =>
// The watermarks have already been provided by the underlying DataStream.
ingestedTable
}
} else {
// No need to generate watermarks if no rowtime attribute is specified.
ingestedTable
}

withWatermarks.getTransformation
}

def needInternalConversion: Boolean = {
val fieldIndexes = TableSourceUtil.computeIndexMapping(
tableSource,
isStreamTable = true,
None)
ScanUtil.hasTimeAttributeField(fieldIndexes) ||
ScanUtil.needsConversion(
tableSource.getReturnType,
TypeExtractor.createTypeInfo(
tableSource, classOf[StreamTableSource[_]], tableSource.getClass, 0)
.getTypeClass.asInstanceOf[Class[_]])
}
}

/**
* Generates periodic watermarks based on a [[PeriodicWatermarkAssigner]].
*
* @param timeFieldIdx the index of the rowtime attribute.
* @param assigner the watermark assigner.
*/
private class PeriodicWatermarkAssignerWrapper(
timeFieldIdx: Int,
assigner: PeriodicWatermarkAssigner)
extends AssignerWithPeriodicWatermarks[BaseRow] {

override def getCurrentWatermark: Watermark = assigner.getWatermark

override def extractTimestamp(row: BaseRow, previousElementTimestamp: Long): Long = {
val timestamp: Long = row.getLong(timeFieldIdx)
assigner.nextTimestamp(timestamp)
0L
}
}

/**
* Generates periodic watermarks based on a [[PunctuatedWatermarkAssigner]].
*
* @param timeFieldIdx the index of the rowtime attribute.
* @param assigner the watermark assigner.
*/
private class PunctuatedWatermarkAssignerWrapper(
timeFieldIdx: Int,
assigner: PunctuatedWatermarkAssigner)
extends AssignerWithPunctuatedWatermarks[BaseRow] {

override def checkAndGetNextWatermark(row: BaseRow, ts: Long): Watermark = {
val timestamp: Long = row.getLong(timeFieldIdx)
assigner.getWatermark(row, timestamp)
}

override def extractTimestamp(element: BaseRow, previousElementTimestamp: Long): Long = {
0L
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object ScanUtil {
val convertFunc = CodeGenUtils.genToInternal(ctx, inputType)
internalInType match {
case rt: RowType => (convertFunc, rt)
case _ => ((record: String) => s"$GENERIC_ROW.wrap(${convertFunc(record)})",
case _ => ((record: String) => s"$GENERIC_ROW.of(${convertFunc(record)})",
new RowType(internalInType))
}
}
Expand Down
Loading

0 comments on commit 2826ff8

Please sign in to comment.