Skip to content

Commit

Permalink
[FLINK-3596] DataSet RelNode refactoring
Browse files Browse the repository at this point in the history
- remove the intermediate flink relnode layer and the dataset rules
- move code generation from rules to DataSet nodes
- remove unused DataSete nodes
- move code generation from join rule to DataSetJoin node
- merge DataSetMap and DataSetReduce into  DataSetAggregate
  • Loading branch information
vasia committed Mar 18, 2016
1 parent a5d8ae2 commit b1b0e3f
Show file tree
Hide file tree
Showing 29 changed files with 337 additions and 1,233 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ class JavaBatchTranslator(config: TableConfig) extends PlanTranslator {
// optimize the logical Flink plan
val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)
val flinkOutputProps = RelTraitSet.createEmpty()
.plus(DataSetConvention.INSTANCE)
.plus(RelCollations.of()).simplify()

val optPlan = try {
val dataSetPlan = try {
optProgram.run(planner, decorPlan, flinkOutputProps)
}
catch {
Expand All @@ -89,30 +91,8 @@ class JavaBatchTranslator(config: TableConfig) extends PlanTranslator {
}

println("---------------")
println("Optimized Plan:")
println("---------------")
println(RelOptUtil.toString(optPlan))

// optimize the logical Flink plan
val dataSetProgram = Programs.ofRules(FlinkRuleSets.DATASET_TRANS_RULES)
val dataSetOutputProps = RelTraitSet.createEmpty()
.plus(DataSetConvention.INSTANCE)
.plus(RelCollations.of()).simplify()

val dataSetPlan = try {
dataSetProgram.run(planner, optPlan, dataSetOutputProps)
}
catch {
case e: CannotPlanException =>
throw new PlanGenException(
s"Cannot generate a valid execution plan for the given query: \n\n" +
s"${RelOptUtil.toString(lPlan)}\n" +
"Please consider filing a bug report.", e)
}

println("-------------")
println("DataSet Plan:")
println("-------------")
println("---------------")
println(RelOptUtil.toString(dataSetPlan))

dataSetPlan match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,43 +20,46 @@ package org.apache.flink.api.table.plan.nodes.dataset

import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.flink.api.common.functions.GroupReduceFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.table.plan.TypeConverter._
import org.apache.flink.api.table.plan.{PlanGenException, TypeConverter}
import org.apache.flink.api.table.runtime.aggregate.AggregateUtil
import org.apache.flink.api.table.runtime.aggregate.AggregateUtil.CalcitePair
import org.apache.flink.api.table.typeinfo.RowTypeInfo
import org.apache.flink.api.table.{Row, TableConfig}

import scala.collection.JavaConverters._

/**
* Flink RelNode which matches along with ReduceGroupOperator.
* Flink RelNode which matches along with a LogicalAggregate.
*/
class DataSetGroupReduce(
class DataSetAggregate(
cluster: RelOptCluster,
traitSet: RelTraitSet,
input: RelNode,
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
rowType: RelDataType,
inputType: RelDataType,
opName: String,
groupingKeys: Array[Int],
func: (TableConfig, TypeInformation[Row], TypeInformation[Row]) =>
GroupReduceFunction[Row, Row])
grouping: Array[Int])
extends SingleRel(cluster, traitSet, input)
with DataSetRel {

override def deriveRowType() = rowType

override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataSetGroupReduce(
new DataSetAggregate(
cluster,
traitSet,
inputs.get(0),
namedAggregates,
rowType,
inputType,
opName,
groupingKeys,
func
)
grouping)
}

override def explainTerms(pw: RelWriter): RelWriter = {
Expand All @@ -69,37 +72,43 @@ class DataSetGroupReduce(

expectedType match {
case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
throw new PlanGenException("GroupReduce operations currently only support returning Rows.")
throw new PlanGenException("Aggregate operations currently only support returning Rows.")
case _ => // ok
}

val groupingKeys = (0 until grouping.length).toArray
// add grouping fields, position keys in the input, and input type
val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates,
inputType, rowType, grouping, config)

val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(
config,
// tell the input operator that this operator currently only supports Rows as input
Some(TypeConverter.DEFAULT_ROW_TYPE))

// get the output types
val fieldsNames = rowType.getFieldNames
val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
.map(f => f.getType.getSqlTypeName)
.map(n => TypeConverter.sqlTypeToTypeInfo(n))
.toArray

val rowTypeInfo = new RowTypeInfo(fieldTypes)
val groupReduceFunction =
func.apply(config, inputDS.getType.asInstanceOf[RowTypeInfo], rowTypeInfo)
val mappedInput = inputDS.map(aggregateResult.mapFunc)
val groupReduceFunction = aggregateResult.reduceGroupFunc

if (groupingKeys.length > 0) {
inputDS.asInstanceOf[DataSet[Row]]
.groupBy(groupingKeys: _*)
.reduceGroup(groupReduceFunction)
.returns(rowTypeInfo)
.asInstanceOf[DataSet[Any]]
mappedInput.asInstanceOf[DataSet[Row]]
.groupBy(groupingKeys: _*)
.reduceGroup(groupReduceFunction)
.returns(rowTypeInfo)
.asInstanceOf[DataSet[Any]]
}
else {
// global aggregation
inputDS.asInstanceOf[DataSet[Row]].reduceGroup(groupReduceFunction)
.returns(rowTypeInfo).asInstanceOf[DataSet[Any]]
mappedInput.asInstanceOf[DataSet[Row]]
.reduceGroup(groupReduceFunction)
.returns(rowTypeInfo)
.asInstanceOf[DataSet[Any]]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.table.plan.nodes.dataset

import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.table.codegen.CodeGenerator
import org.apache.flink.api.table.plan.TypeConverter._
import org.apache.flink.api.table.runtime.FlatMapRunner
import org.apache.flink.api.table.TableConfig
import org.apache.calcite.rex.RexProgram
import scala.collection.JavaConversions._

/**
* Flink RelNode which matches along with LogicalCalc.
*
*/
class DataSetCalc(
cluster: RelOptCluster,
traitSet: RelTraitSet,
input: RelNode,
rowType: RelDataType,
calcProgram: RexProgram,
opName: String,
ruleDescription: String)
extends SingleRel(cluster, traitSet, input)
with DataSetRel {

override def deriveRowType() = rowType

override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataSetCalc(
cluster,
traitSet,
inputs.get(0),
rowType,
calcProgram,
opName,
ruleDescription)
}

override def explainTerms(pw: RelWriter): RelWriter = {
super.explainTerms(pw).item("name", opName)
}

override def toString = opName

override def translateToPlan(config: TableConfig,
expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {

val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config)

val returnType = determineReturnType(
getRowType,
expectedType,
config.getNullCheck,
config.getEfficientTypeUsage)

val generator = new CodeGenerator(config, inputDS.getType)

val condition = calcProgram.getCondition
val expandedExpressions = calcProgram.getProjectList.map(
expr => calcProgram.expandLocalRef(expr))
val projection = generator.generateResultExpression(
returnType,
rowType.getFieldNames,
expandedExpressions)

val body = {
// only projection
if (condition == null) {
s"""
|${projection.code}
|${generator.collectorTerm}.collect(${projection.resultTerm});
|""".stripMargin
}
else {
val filterCondition = generator.generateExpression(
calcProgram.expandLocalRef(calcProgram.getCondition))
// only filter
if (projection == null) {
// conversion
if (inputDS.getType != returnType) {
val conversion = generator.generateConverterResultExpression(
returnType,
rowType.getFieldNames)

s"""
|${filterCondition.code}
|if (${filterCondition.resultTerm}) {
| ${conversion.code}
| ${generator.collectorTerm}.collect(${conversion.resultTerm});
|}
|""".stripMargin
}
// no conversion
else {
s"""
|${filterCondition.code}
|if (${filterCondition.resultTerm}) {
| ${generator.collectorTerm}.collect(${generator.input1Term});
|}
|""".stripMargin
}
}
// both filter and projection
else {
s"""
|${filterCondition.code}
|if (${filterCondition.resultTerm}) {
| ${projection.code}
| ${generator.collectorTerm}.collect(${projection.resultTerm});
|}
|""".stripMargin
}
}
}

val genFunction = generator.generateFunction(
ruleDescription,
classOf[FlatMapFunction[Any, Any]],
body,
returnType)

val mapFunc = new FlatMapRunner[Any, Any](
genFunction.name,
genFunction.code,
genFunction.returnType)

inputDS.flatMap(mapFunc)
}
}

This file was deleted.

Loading

0 comments on commit b1b0e3f

Please sign in to comment.