Skip to content

Commit

Permalink
[FLINK-16991][table-planner-blink] Rename current LogicalSink into Lo…
Browse files Browse the repository at this point in the history
…gicalLegacySink
  • Loading branch information
wuchong committed May 13, 2020
1 parent 78b516c commit 3c6df77
Show file tree
Hide file tree
Showing 45 changed files with 350 additions and 403 deletions.
8 changes: 4 additions & 4 deletions docs/dev/table/common.md
Original file line number Diff line number Diff line change
Expand Up @@ -1749,11 +1749,11 @@ the result of multiple-sinks plan is
{% highlight text %}

== Abstract Syntax Tree ==
LogicalSink(name=[MySink1], fields=[count, word])
LogicalLegacySink(name=[MySink1], fields=[count, word])
+- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])

LogicalSink(name=[MySink2], fields=[count, word])
LogicalLegacySink(name=[MySink2], fields=[count, word])
+- LogicalUnion(all=[true])
:- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
: +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
Expand All @@ -1763,10 +1763,10 @@ LogicalSink(name=[MySink2], fields=[count, word])
Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')], reuse_id=[1])
+- TableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])

Sink(name=[MySink1], fields=[count, word])
LegacySink(name=[MySink1], fields=[count, word])
+- Reused(reference_id=[1])

Sink(name=[MySink2], fields=[count, word])
LegacySink(name=[MySink2], fields=[count, word])
+- Union(all=[true], union=[count, word])
:- Reused(reference_id=[1])
+- TableSourceScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
Expand Down
8 changes: 4 additions & 4 deletions docs/dev/table/common.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -1730,11 +1730,11 @@ print(explanation)
{% highlight text %}

== Abstract Syntax Tree ==
LogicalSink(name=[MySink1], fields=[count, word])
LogicalLegacySink(name=[MySink1], fields=[count, word])
+- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])

LogicalSink(name=[MySink2], fields=[count, word])
LogicalLegacySink(name=[MySink2], fields=[count, word])
+- LogicalUnion(all=[true])
:- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
: +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
Expand All @@ -1744,10 +1744,10 @@ LogicalSink(name=[MySink2], fields=[count, word])
Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')], reuse_id=[1])
+- TableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])

Sink(name=[MySink1], fields=[count, word])
LegacySink(name=[MySink1], fields=[count, word])
+- Reused(reference_id=[1])

Sink(name=[MySink2], fields=[count, word])
LegacySink(name=[MySink2], fields=[count, word])
+- Union(all=[true], union=[count, word])
:- Reused(reference_id=[1])
+- TableSourceScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@

package org.apache.flink.table.planner.calcite

import org.apache.flink.table.planner.calcite.FlinkRelFactories.{ExpandFactory, RankFactory, SinkFactory}
import org.apache.flink.table.planner.calcite.FlinkRelFactories.{ExpandFactory, RankFactory}
import org.apache.flink.table.planner.plan.nodes.logical._
import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase
import org.apache.flink.table.runtime.operators.rank.{RankRange, RankType}
import org.apache.flink.table.sinks.TableSink

import com.google.common.collect.ImmutableList
import org.apache.calcite.plan.RelOptTable.ToRelContext
Expand Down Expand Up @@ -60,7 +59,6 @@ object FlinkLogicalRelFactories {
val FLINK_LOGICAL_TABLE_SCAN_FACTORY = new TableScanFactoryImpl
val FLINK_LOGICAL_EXPAND_FACTORY = new ExpandFactoryImpl
val FLINK_LOGICAL_RANK_FACTORY = new RankFactoryImpl
val FLINK_LOGICAL_SINK_FACTORY = new SinkFactoryImpl

/** A [[RelBuilderFactory]] that creates a [[RelBuilder]] that will
* create logical relational expressions for everything. */
Expand All @@ -75,8 +73,7 @@ object FlinkLogicalRelFactories {
FLINK_LOGICAL_VALUES_FACTORY,
FLINK_LOGICAL_TABLE_SCAN_FACTORY,
FLINK_LOGICAL_EXPAND_FACTORY,
FLINK_LOGICAL_RANK_FACTORY,
FLINK_LOGICAL_SINK_FACTORY))
FLINK_LOGICAL_RANK_FACTORY))

/**
* Implementation of [[ProjectFactory]] that returns a [[FlinkLogicalCalc]].
Expand Down Expand Up @@ -261,17 +258,4 @@ object FlinkLogicalRelFactories {
rankNumberType, outputRankNumber)
}
}

/**
* Implementation of [[FlinkRelFactories.SinkFactory]] that returns a [[FlinkLogicalSink]].
*/
class SinkFactoryImpl extends SinkFactory {
def createSink(
input: RelNode,
sink: TableSink[_],
sinkName: String): RelNode = {
FlinkLogicalSink.create(input, sink, sinkName)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ package org.apache.flink.table.planner.calcite

import org.apache.flink.table.operations.QueryOperation
import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
import org.apache.flink.table.planner.calcite.FlinkRelFactories.{ExpandFactory, RankFactory, SinkFactory}
import org.apache.flink.table.planner.calcite.FlinkRelFactories.{ExpandFactory, RankFactory}
import org.apache.flink.table.planner.expressions.{PlannerWindowProperty, WindowProperty}
import org.apache.flink.table.planner.plan.QueryOperationConverter
import org.apache.flink.table.planner.plan.logical.LogicalWindow
import org.apache.flink.table.planner.plan.nodes.calcite.{LogicalTableAggregate, LogicalWatermarkAssigner, LogicalWindowAggregate, LogicalWindowTableAggregate}
import org.apache.flink.table.planner.plan.utils.AggregateUtil
import org.apache.flink.table.runtime.operators.rank.{RankRange, RankType}
import org.apache.flink.table.sinks.TableSink

import org.apache.calcite.plan._
import org.apache.calcite.rel.RelCollation
Expand Down Expand Up @@ -71,10 +70,6 @@ class FlinkRelBuilder(
Util.first(context.unwrap(classOf[RankFactory]), FlinkRelFactories.DEFAULT_RANK_FACTORY)
}

private val sinkFactory: SinkFactory = {
Util.first(context.unwrap(classOf[SinkFactory]), FlinkRelFactories.DEFAULT_SINK_FACTORY)
}

override def getRelOptSchema: RelOptSchema = relOptSchema

override def getCluster: RelOptCluster = relOptCluster
Expand All @@ -91,12 +86,6 @@ class FlinkRelBuilder(
push(expand)
}

def sink(sink: TableSink[_], sinkName: String): RelBuilder = {
val input = build()
val sinkNode = sinkFactory.createSink(input, sink, sinkName)
push(sinkNode)
}

def rank(
partitionKey: ImmutableBitSet,
orderKey: RelCollation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@

package org.apache.flink.table.planner.calcite

import org.apache.flink.table.planner.plan.nodes.calcite.{LogicalExpand, LogicalRank, LogicalSink}
import org.apache.flink.table.planner.plan.nodes.calcite.{LogicalExpand, LogicalRank}
import org.apache.flink.table.runtime.operators.rank.{RankRange, RankType}
import org.apache.flink.table.sinks.TableSink

import org.apache.calcite.plan.Contexts
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
Expand Down Expand Up @@ -56,8 +55,6 @@ object FlinkRelFactories {

val DEFAULT_RANK_FACTORY = new RankFactoryImpl

val DEFAULT_SINK_FACTORY = new SinkFactoryImpl

/**
* Can create a [[LogicalExpand]] of the
* appropriate type for this rule's calling convention.
Expand Down Expand Up @@ -112,27 +109,4 @@ object FlinkRelFactories {
rankNumberType, outputRankNumber)
}
}

/**
* Can create a [[LogicalSink]] of the
* appropriate type for this rule's calling convention.
*/
trait SinkFactory {

def createSink(
input: RelNode,
sink: TableSink[_],
sinkName: String): RelNode
}

/**
* Implementation of [[SinkFactory]] that returns a [[LogicalSink]].
*/
class SinkFactoryImpl extends SinkFactory {

def createSink(
input: RelNode,
sink: TableSink[_],
sinkName: String): RelNode = LogicalSink.create(input, sink, sinkName)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
val input = snapshot.getInput.accept(this)
snapshot.copy(snapshot.getTraitSet, input, snapshot.getPeriod)

case sink: LogicalSink =>
case sink: LogicalLegacySink =>
var newInput = sink.getInput.accept(this)
var needsConversion = false

Expand All @@ -181,7 +181,7 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
if (needsConversion) {
newInput = LogicalProject.create(newInput, projects, newInput.getRowType.getFieldNames)
}
new LogicalSink(
new LogicalLegacySink(
sink.getCluster,
sink.getTraitSet,
newInput,
Expand Down Expand Up @@ -482,7 +482,7 @@ object RelTimeIndicatorConverter {
val convertedRoot = rootRel.accept(converter)

// the LogicalSink is converted in RelTimeIndicatorConverter before
if (rootRel.isInstanceOf[LogicalSink] || !needFinalTimeIndicatorConversion) {
if (rootRel.isInstanceOf[LogicalLegacySink] || !needFinalTimeIndicatorConversion) {
return convertedRoot
}
var needsConversion = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.flink.table.planner.calcite.{CalciteParser, FlinkPlannerImpl,
import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema
import org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl
import org.apache.flink.table.planner.hint.FlinkHints
import org.apache.flink.table.planner.plan.nodes.calcite.LogicalSink
import org.apache.flink.table.planner.plan.nodes.calcite.LogicalLegacySink
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode
import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel
import org.apache.flink.table.planner.plan.optimize.Optimizer
Expand Down Expand Up @@ -178,7 +178,7 @@ abstract class PlannerBase(
val sinkSchema = s.getSink.getTableSchema
// validate query schema and sink schema, and apply cast if possible
val query = validateSchemaAndApplyImplicitCast(input, sinkSchema, getTypeFactory)
LogicalSink.create(
LogicalLegacySink.create(
query,
s.getSink,
"UnregisteredSink",
Expand All @@ -201,7 +201,7 @@ abstract class PlannerBase(
TableSchemaUtils.getPhysicalSchema(table.getSchema),
getTypeFactory,
Some(catalogSink.getTableIdentifier.asSummaryString()))
LogicalSink.create(
LogicalLegacySink.create(
query,
sink,
identifier.toString,
Expand Down Expand Up @@ -233,7 +233,7 @@ abstract class PlannerBase(
typeInfo,
needUpdateBefore,
withChangeFlag)
LogicalSink.create(
LogicalLegacySink.create(
query,
tableSink,
"DataStreamTableSink",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
* @param sink Table sink to write into
* @param sinkName Name of tableSink, which is not required property, that is, it could be null
*/
abstract class Sink(
abstract class LegacySink(
cluster: RelOptCluster,
traitSet: RelTraitSet,
input: RelNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,36 +29,36 @@ import java.util
import scala.collection.JavaConversions._

/**
* Sub-class of [[Sink]] that is a relational expression
* Sub-class of [[LegacySink]] that is a relational expression
* which writes out data of input node into a [[TableSink]].
* This class corresponds to Calcite logical rel.
*/
final class LogicalSink(
final class LogicalLegacySink(
cluster: RelOptCluster,
traitSet: RelTraitSet,
input: RelNode,
sink: TableSink[_],
sinkName: String,
val catalogTable: CatalogTable,
val staticPartitions: Map[String, String])
extends Sink(cluster, traitSet, input, sink, sinkName) {
extends LegacySink(cluster, traitSet, input, sink, sinkName) {

override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new LogicalSink(
new LogicalLegacySink(
cluster, traitSet, inputs.head, sink, sinkName, catalogTable, staticPartitions)
}

}

object LogicalSink {
object LogicalLegacySink {

def create(input: RelNode,
sink: TableSink[_],
sinkName: String,
catalogTable: CatalogTable = null,
staticPartitions: Map[String, String] = Map()): LogicalSink = {
staticPartitions: Map[String, String] = Map()): LogicalLegacySink = {
val traits = input.getCluster.traitSetOf(Convention.NONE)
new LogicalSink(
new LogicalLegacySink(
input.getCluster, traits, input, sink, sinkName, catalogTable, staticPartitions)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.nodes.logical

import org.apache.flink.table.catalog.CatalogTable
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.calcite.{LogicalSink, Sink}
import org.apache.flink.table.planner.plan.nodes.calcite.{LogicalLegacySink, LegacySink}
import org.apache.flink.table.sinks.TableSink

import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, RelTraitSet}
Expand All @@ -32,38 +32,38 @@ import java.util
import scala.collection.JavaConversions._

/**
* Sub-class of [[Sink]] that is a relational expression
* Sub-class of [[LegacySink]] that is a relational expression
* which writes out data of input node into a [[TableSink]].
*/
class FlinkLogicalSink(
class FlinkLogicalLegacySink(
cluster: RelOptCluster,
traitSet: RelTraitSet,
input: RelNode,
sink: TableSink[_],
sinkName: String,
val catalogTable: CatalogTable,
val staticPartitions: Map[String, String])
extends Sink(cluster, traitSet, input, sink, sinkName)
with FlinkLogicalRel {
extends LegacySink(cluster, traitSet, input, sink, sinkName)
with FlinkLogicalRel {

override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new FlinkLogicalSink(
new FlinkLogicalLegacySink(
cluster, traitSet, inputs.head, sink, sinkName, catalogTable, staticPartitions)
}

}

private class FlinkLogicalSinkConverter
private class FlinkLogicalLegacySinkConverter
extends ConverterRule(
classOf[LogicalSink],
classOf[LogicalLegacySink],
Convention.NONE,
FlinkConventions.LOGICAL,
"FlinkLogicalSinkConverter") {
"FlinkLogicalLegacySinkConverter") {

override def convert(rel: RelNode): RelNode = {
val sink = rel.asInstanceOf[LogicalSink]
val sink = rel.asInstanceOf[LogicalLegacySink]
val newInput = RelOptRule.convert(sink.getInput, FlinkConventions.LOGICAL)
FlinkLogicalSink.create(
FlinkLogicalLegacySink.create(
newInput,
sink.sink,
sink.sinkName,
Expand All @@ -72,18 +72,18 @@ private class FlinkLogicalSinkConverter
}
}

object FlinkLogicalSink {
val CONVERTER: ConverterRule = new FlinkLogicalSinkConverter()
object FlinkLogicalLegacySink {
val CONVERTER: ConverterRule = new FlinkLogicalLegacySinkConverter()

def create(
input: RelNode,
sink: TableSink[_],
sinkName: String,
catalogTable: CatalogTable = null,
staticPartitions: Map[String, String] = Map()): FlinkLogicalSink = {
staticPartitions: Map[String, String] = Map()): FlinkLogicalLegacySink = {
val cluster = input.getCluster
val traitSet = cluster.traitSetOf(FlinkConventions.LOGICAL).simplify()
new FlinkLogicalSink(
new FlinkLogicalLegacySink(
cluster, traitSet, input, sink, sinkName, catalogTable, staticPartitions)
}
}
Loading

0 comments on commit 3c6df77

Please sign in to comment.