Skip to content

Commit

Permalink
rename RelNodeUtil to RelExplainUtil and do some refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
godfreyhe committed Mar 11, 2019
1 parent 66e4acf commit b38ff77
Show file tree
Hide file tree
Showing 26 changed files with 802 additions and 717 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ case class VariableRankRange(rankEndIndex: Int) extends RankRange {
object Rank {
def sortFieldsToString(collationSort: RelCollation): String = {
val fieldCollations = collationSort.getFieldCollations
.map(c => (c.getFieldIndex, RelNodeUtil.directionToOrder(c.getDirection)))
.map(c => (c.getFieldIndex, FlinkRelOptUtil.directionToOrder(c.getDirection)))

fieldCollations.map {
case (index, order) => s"$$$index ${order.getShortName}"
Expand All @@ -181,7 +181,7 @@ object Rank {

def sortFieldsToString(collationSort: RelCollation, inputType: RelDataType): String = {
val fieldCollations = collationSort.getFieldCollations
.map(c => (c.getFieldIndex, RelNodeUtil.directionToOrder(c.getDirection)))
.map(c => (c.getFieldIndex, FlinkRelOptUtil.directionToOrder(c.getDirection)))
val inputFieldNames = inputType.getFieldNames

fieldCollations.map {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.plan.nodes.common

import org.apache.flink.table.plan.nodes.ExpressionFormat.ExpressionFormat
import org.apache.flink.table.plan.nodes.{ExpressionFormat, FlinkRelNode}

import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
import org.apache.calcite.rel.core.Calc
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rex.{RexCall, RexInputRef, RexLiteral, RexProgram}

import scala.collection.JavaConversions._

/**
* Base class for flink [[Calc]].
*/
abstract class CommonCalc(
cluster: RelOptCluster,
traitSet: RelTraitSet,
input: RelNode,
calcProgram: RexProgram)
extends Calc(cluster, traitSet, input, calcProgram)
with FlinkRelNode {


override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
val calcProgram = getProgram
// compute number of expressions that do not access a field or literal, i.e. computations,
// conditions, etc. We only want to account for computations, not for simple projections.
// CASTs in RexProgram are reduced as far as possible by ReduceExpressionsRule
// in normalization stage. So we should ignore CASTs here in optimization stage.
val compCnt = calcProgram.getProjectList.map(calcProgram.expandLocalRef).toList.count {
case _: RexInputRef => false
case _: RexLiteral => false
case c: RexCall if c.getOperator.getName.equals("CAST") => false
case _ => true
}
val newRowCnt = mq.getRowCount(this)
// TODO use inputRowCnt to compute cpu cost
planner.getCostFactory.makeCost(newRowCnt, newRowCnt * compCnt, 0)
}

override def explainTerms(pw: RelWriter): RelWriter = {
pw.input("input", getInput)
.item("select", projectionToString())
.itemIf("where", conditionToString(), calcProgram.getCondition != null)
}

protected def conditionToString(): String = {
val cond = calcProgram.getCondition
val inputFieldNames = calcProgram.getInputRowType.getFieldNames.toList
val localExprs = calcProgram.getExprList.toList

if (cond != null) {
getExpressionString(cond, inputFieldNames, Some(localExprs))
} else {
""
}
}

protected def projectionToString(
expressionFormat: ExpressionFormat = ExpressionFormat.Prefix): String = {
val projectList = calcProgram.getProjectList.toList
val inputFieldNames = calcProgram.getInputRowType.getFieldNames.toList
val localExprs = calcProgram.getExprList.toList
val outputFieldNames = calcProgram.getOutputRowType.getFieldNames.toList

projectList
.map(getExpressionString(_, inputFieldNames, Some(localExprs), expressionFormat))
.zip(outputFieldNames).map { case (e, o) =>
if (e != o) {
e + " AS " + o
} else {
e
}
}.mkString(", ")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
* limitations under the License.
*/

package org.apache.flink.table.plan.nodes.physical.common
package org.apache.flink.table.plan.nodes.common

import org.apache.flink.table.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.plan.cost.FlinkCost._
import org.apache.flink.table.plan.cost.FlinkCostFactory
import org.apache.flink.table.plan.nodes.FlinkRelNode

import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
import org.apache.calcite.rel.core.Exchange
Expand All @@ -30,14 +31,15 @@ import org.apache.calcite.rel.{RelDistribution, RelNode, RelWriter}
import scala.collection.JavaConverters._

/**
* Base class for physical [[Exchange]].
* Base class for flink [[Exchange]].
*/
abstract class CommonExchange(
cluster: RelOptCluster,
traitSet: RelTraitSet,
relNode: RelNode,
relDistribution: RelDistribution)
extends Exchange(cluster, traitSet, relNode, relDistribution) {
extends Exchange(cluster, traitSet, relNode, relDistribution)
with FlinkRelNode {

override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
val inputRows = mq.getRowCount(input)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.table.plan.nodes.logical

import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.util.RelNodeUtil
import org.apache.flink.table.plan.util.{FlinkRelOptUtil, RelExplainUtil}

import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
Expand Down Expand Up @@ -61,7 +61,7 @@ class FlinkLogicalAggregate(
}

override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
if (getGroupSets.size > 1 || RelNodeUtil.getGroupIdExprIndexes(getAggCallList).nonEmpty) {
if (getGroupSets.size > 1 || FlinkRelOptUtil.getGroupIdExprIndexes(getAggCallList).nonEmpty) {
planner.getCostFactory.makeInfiniteCost()
} else {
val child = this.getInput
Expand Down Expand Up @@ -96,7 +96,7 @@ private class FlinkLogicalAggregateBatchConverter
case _ => true
}

val hasAccurateDistinctCall = RelNodeUtil.containsAccurateDistinctCall(agg.getAggCallList)
val hasAccurateDistinctCall = FlinkRelOptUtil.containsAccurateDistinctCall(agg.getAggCallList)

!hasAccurateDistinctCall && supported
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
package org.apache.flink.table.plan.nodes.logical

import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.util.RelNodeUtil
import org.apache.flink.table.plan.nodes.common.CommonCalc

import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.Calc
import org.apache.calcite.rel.logical.LogicalCalc
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rex.RexProgram

/**
Expand All @@ -38,24 +37,13 @@ class FlinkLogicalCalc(
traitSet: RelTraitSet,
input: RelNode,
calcProgram: RexProgram)
extends Calc(cluster, traitSet, input, calcProgram)
extends CommonCalc(cluster, traitSet, input, calcProgram)
with FlinkLogicalRel {

override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = {
new FlinkLogicalCalc(cluster, traitSet, child, program)
}

override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
RelNodeUtil.computeCalcCost(this, planner, mq)
}

override def explainTerms(pw: RelWriter): RelWriter = {
val (conditionStr, projectStr) = RelNodeUtil.programToString(calcProgram, getExpressionString)
pw.input("input", getInput)
.item("select", projectStr)
.itemIf("where", conditionStr, calcProgram.getCondition != null)
}

}

private class FlinkLogicalCalcConverter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.table.plan.nodes.logical

import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.util.RelNodeUtil
import org.apache.flink.table.plan.util.{FlinkRelOptUtil, RelExplainUtil}

import org.apache.calcite.plan._
import org.apache.calcite.rel.convert.ConverterRule
Expand All @@ -43,7 +43,7 @@ class FlinkLogicalSort(
extends Sort(cluster, traits, child, collation, offset, fetch)
with FlinkLogicalRel {

private lazy val limitStart: Long = RelNodeUtil.getLimitStart(offset)
private lazy val limitStart: Long = FlinkRelOptUtil.getLimitStart(offset)

override def copy(
traitSet: RelTraitSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@

package org.apache.flink.table.plan.nodes.physical.batch

import org.apache.flink.table.plan.util.RelNodeUtil
import org.apache.flink.table.plan.nodes.common.CommonCalc

import org.apache.calcite.plan._
import org.apache.calcite.rel._
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.Calc
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rex.RexProgram

/**
Expand All @@ -36,7 +35,7 @@ class BatchExecCalc(
inputRel: RelNode,
calcProgram: RexProgram,
outputRowType: RelDataType)
extends Calc(cluster, traitSet, inputRel, calcProgram)
extends CommonCalc(cluster, traitSet, inputRel, calcProgram)
with BatchPhysicalRel {

override def deriveRowType(): RelDataType = outputRowType
Expand All @@ -45,15 +44,4 @@ class BatchExecCalc(
new BatchExecCalc(cluster, traitSet, child, program, outputRowType)
}

override def explainTerms(pw: RelWriter): RelWriter = {
val (conditionStr, projectStr) = RelNodeUtil.programToString(calcProgram, getExpressionString)
pw.input("input", getInput)
.item("select", projectStr)
.itemIf("where", conditionStr, calcProgram.getCondition != null)
}

override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
RelNodeUtil.computeCalcCost(this, planner, mq)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.table.plan.nodes.physical.batch

import org.apache.flink.table.plan.nodes.physical.common.CommonExchange
import org.apache.flink.table.plan.nodes.common.CommonExchange

import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.{RelDistribution, RelNode}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.flink.table.plan.nodes.physical.batch

import org.apache.flink.table.plan.nodes.calcite.Expand
import org.apache.flink.table.plan.util.RelNodeUtil
import org.apache.flink.table.plan.util.RelExplainUtil

import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
Expand Down Expand Up @@ -53,7 +53,7 @@ class BatchExecExpand(

override def explainTerms(pw: RelWriter): RelWriter = {
super.explainTerms(pw)
.item("projects", RelNodeUtil.projectsToString(projects, input.getRowType, getRowType))
.item("projects", RelExplainUtil.projectsToString(projects, input.getRowType, getRowType))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.flink.table.plan.nodes.physical.batch

import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.plan.util.RelNodeUtil
import org.apache.flink.table.plan.util.RelExplainUtil

import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
Expand Down Expand Up @@ -74,10 +74,10 @@ class BatchExecHashAggregate(
override def explainTerms(pw: RelWriter): RelWriter = {
super.explainTerms(pw)
.item("isMerge", isMerge)
.itemIf("groupBy", RelNodeUtil.fieldToString(grouping, inputRowType), grouping.nonEmpty)
.itemIf("auxGrouping", RelNodeUtil.fieldToString(auxGrouping, inputRowType),
.itemIf("groupBy", RelExplainUtil.fieldToString(grouping, inputRowType), grouping.nonEmpty)
.itemIf("auxGrouping", RelExplainUtil.fieldToString(auxGrouping, inputRowType),
auxGrouping.nonEmpty)
.item("select", RelNodeUtil.groupAggregationToString(
.item("select", RelExplainUtil.groupAggregationToString(
inputRowType,
getRowType,
grouping,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.flink.table.plan.nodes.physical.batch

import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.plan.util.RelNodeUtil
import org.apache.flink.table.plan.util.FlinkRelOptUtil
import org.apache.flink.table.typeutils.BinaryRowSerializer

import org.apache.calcite.plan._
Expand All @@ -43,7 +43,7 @@ trait BatchExecHashJoinBase extends BatchExecJoinBase {
var haveInsertRf: Boolean

private val (leftKeys, rightKeys) =
RelNodeUtil.checkAndGetJoinKeys(keyPairs, getLeft, getRight, allowEmptyKey = true)
FlinkRelOptUtil.checkAndGetJoinKeys(keyPairs, getLeft, getRight, allowEmptyKey = true)
val (buildKeys, probeKeys) = if (leftIsBuild) (leftKeys, rightKeys) else (rightKeys, leftKeys)

// Inputs could be changed. See [[BiRel.replaceInput]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.flink.table.plan.nodes.physical.batch

import org.apache.flink.table.plan.FlinkJoinRelType
import org.apache.flink.table.plan.util.RelNodeUtil
import org.apache.flink.table.plan.util.RelExplainUtil

import org.apache.calcite.rel.RelWriter
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
Expand Down Expand Up @@ -63,8 +63,10 @@ trait BatchExecJoinBase extends Join with BatchPhysicalRel {

override def explainTerms(pw: RelWriter): RelWriter = {
pw.input("left", getLeft).input("right", getRight)
RelNodeUtil.joinExplainTerms(
pw, getCondition, flinkJoinType, inputRowType, getRowType, getExpressionString)
.item("where",
RelExplainUtil.expressionToString(getCondition, inputRowType, getExpressionString))
.item("join", getRowType.getFieldNames.mkString(", "))
.item("joinType", RelExplainUtil.joinTypeToString(flinkJoinType))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.flink.table.plan.nodes.physical.batch

import org.apache.flink.table.plan.cost.FlinkCost._
import org.apache.flink.table.plan.cost.FlinkCostFactory
import org.apache.flink.table.plan.util.RelNodeUtil
import org.apache.flink.table.plan.util.{FlinkRelOptUtil, RelExplainUtil}

import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
import org.apache.calcite.rel._
Expand Down Expand Up @@ -48,8 +48,8 @@ class BatchExecLimit(
fetch)
with BatchPhysicalRel {

private lazy val limitStart: Long = RelNodeUtil.getLimitStart(offset)
private lazy val limitEnd: Long = RelNodeUtil.getLimitEnd(offset, fetch)
private lazy val limitStart: Long = FlinkRelOptUtil.getLimitStart(offset)
private lazy val limitEnd: Long = FlinkRelOptUtil.getLimitEnd(offset, fetch)

override def copy(
traitSet: RelTraitSet,
Expand All @@ -63,7 +63,7 @@ class BatchExecLimit(
override def explainTerms(pw: RelWriter): RelWriter = {
pw.input("input", getInput)
.item("offset", limitStart)
.item("fetch", RelNodeUtil.fetchToString(fetch))
.item("fetch", RelExplainUtil.fetchToString(fetch))
.item("global", isGlobal)
}

Expand Down
Loading

0 comments on commit b38ff77

Please sign in to comment.