Skip to content

Commit

Permalink
[FLINK-20437][table-planner-blink] Move the utility methods in ExecNo…
Browse files Browse the repository at this point in the history
…de into ExecNodeUtil
  • Loading branch information
godfreyhe committed Dec 4, 2020
1 parent e10e548 commit af807df
Show file tree
Hide file tree
Showing 25 changed files with 158 additions and 105 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.nodes.exec.utils;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;

import java.util.Optional;

/**
* An Utility class that helps translating {@link ExecNode} to {@link Transformation}.
*/
public class ExecNodeUtil {

/**
* Set memoryBytes to {@link Transformation#declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase, int)}.
*/
public static <T> Transformation<T> setManagedMemoryWeight(
Transformation<T> transformation,
long memoryBytes) {
// Using Bytes can easily overflow
// Using KibiBytes to cast to int
// Careful about zero
if (memoryBytes > 0) {
int memoryKibiBytes = (int) Math.max(1, (memoryBytes >> 10));
Optional<Integer> previousWeight = transformation.declareManagedMemoryUseCaseAtOperatorScope(
ManagedMemoryUseCase.BATCH_OP, memoryKibiBytes);
if (previousWeight.isPresent()) {
throw new TableException("Managed memory weight has been set, this should not happen.");
}
}
return transformation;
}

/**
* Create a {@link OneInputTransformation} with memoryBytes.
*/
public static <T> OneInputTransformation<T, T> createOneInputTransformation(
Transformation<T> input,
String name,
StreamOperatorFactory<T> operatorFactory,
TypeInformation<T> outputType,
int parallelism,
long memoryBytes) {
OneInputTransformation<T, T> transformation = new OneInputTransformation<>(
input, name, operatorFactory, outputType, parallelism);
setManagedMemoryWeight(transformation, memoryBytes);
return transformation;
}

/**
* Create a {@link TwoInputTransformation} with memoryBytes.
*/
public static <T> TwoInputTransformation<T, T, T> createTwoInputTransformation(
Transformation<T> input1,
Transformation<T> input2,
String name,
StreamOperatorFactory<T> operatorFactory,
TypeInformation<T> outputType,
int parallelism,
long memoryBytes) {
TwoInputTransformation<T, T, T> transformation = new TwoInputTransformation<>(
input1, input2, name, operatorFactory, outputType, parallelism);
setManagedMemoryWeight(transformation, memoryBytes);
return transformation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

package org.apache.flink.table.planner.codegen

import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.JoinRelType
import org.apache.calcite.rex._
import org.apache.flink.api.common.functions.Function
import org.apache.flink.api.dag.Transformation
import org.apache.flink.streaming.api.functions.ProcessFunction
Expand All @@ -32,13 +29,17 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.codegen.CodeGenUtils._
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
import org.apache.flink.table.planner.functions.utils.TableSqlFunction
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
import org.apache.flink.table.runtime.util.StreamRecordCollector
import org.apache.flink.table.types.logical.RowType

import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.JoinRelType
import org.apache.calcite.rex._

import scala.collection.JavaConversions._

object CorrelateCodeGenerator {
Expand Down Expand Up @@ -110,12 +111,13 @@ object CorrelateCodeGenerator {
classOf[ProcessFunction[RowData, RowData]],
retainHeader)

ExecNode.createOneInputTransformation(
ExecNodeUtil.createOneInputTransformation(
inputTransformation,
transformationName,
substituteStreamOperator,
InternalTypeInfo.of(returnType),
parallelism)
parallelism,
0)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator._
import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, LookupJoinCodeGenerator}
import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.{getParamClassesConsiderVarArgs, getUserDefinedMethod, signatureToString, signaturesToString}
import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil
import org.apache.flink.table.planner.plan.schema.{LegacyTableSourceTable, TableSourceTable}
import org.apache.flink.table.planner.plan.utils.LookupJoinUtil._
import org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall
Expand Down Expand Up @@ -429,12 +429,13 @@ abstract class CommonLookupJoin(
SimpleOperatorFactory.of(new ProcessOperator(processFunc))
}

ExecNode.createOneInputTransformation(
ExecNodeUtil.createOneInputTransformation(
inputTransformation,
getRelDetailedDescription,
operatorFactory,
InternalTypeInfo.of(resultRowType),
inputTransformation.getParallelism)
inputTransformation.getParallelism,
0)
}

private def rowTypeEquals(expected: TypeInformation[_], actual: TypeInformation[_]): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,7 @@

package org.apache.flink.table.planner.plan.nodes.exec

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.dag.Transformation
import org.apache.flink.core.memory.ManagedMemoryUseCase
import org.apache.flink.streaming.api.operators.StreamOperatorFactory
import org.apache.flink.streaming.api.transformations.{OneInputTransformation, TwoInputTransformation}
import org.apache.flink.table.api.TableException
import org.apache.flink.table.delegation.Planner
import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel

Expand Down Expand Up @@ -78,60 +73,3 @@ trait ExecNode[T] {
def accept(visitor: ExecNodeVisitor)

}

object ExecNode {

/**
* Set memoryBytes to Transformation.setManagedMemoryWeight.
*/
def setManagedMemoryWeight[T](
transformation: Transformation[T],
memoryBytes: Long = 0): Transformation[T] = {

// Using Bytes can easily overflow
// Using KibiBytes to cast to int
// Careful about zero
if (memoryBytes != 0) {
val memoryKibiBytes = if (memoryBytes == 0) 0 else Math.max(1, (memoryBytes >> 10).toInt)
val previousWeight = transformation.declareManagedMemoryUseCaseAtOperatorScope(
ManagedMemoryUseCase.BATCH_OP, memoryKibiBytes)
if (previousWeight.isPresent) {
throw new TableException("Managed memory weight has been set, this should not happen.")
}
}
transformation
}

/**
* Create a [[OneInputTransformation]] with memoryBytes.
*/
def createOneInputTransformation[T](
input: Transformation[T],
name: String,
operatorFactory: StreamOperatorFactory[T],
outputType: TypeInformation[T],
parallelism: Int,
memoryBytes: Long = 0): OneInputTransformation[T, T] = {
val ret = new OneInputTransformation[T, T](
input, name, operatorFactory, outputType, parallelism)
setManagedMemoryWeight(ret, memoryBytes)
ret
}

/**
* Create a [[TwoInputTransformation]] with memoryBytes.
*/
def createTwoInputTransformation[T](
input1: Transformation[T],
input2: Transformation[T],
name: String,
operatorFactory: StreamOperatorFactory[T],
outputType: TypeInformation[T],
parallelism: Int,
memoryBytes: Long = 0): TwoInputTransformation[T, T, T] = {
val ret = new TwoInputTransformation[T, T, T](
input1, input2, name, operatorFactory, outputType, parallelism)
setManagedMemoryWeight(ret, memoryBytes)
ret
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.flink.table.data.RowData
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.codegen.{CalcCodeGenerator, CodeGeneratorContext}
import org.apache.flink.table.planner.delegation.BatchPlanner
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo

import org.apache.calcite.plan._
Expand Down Expand Up @@ -68,11 +68,12 @@ class BatchExecCalc(
opName = "BatchCalc"
)

ExecNode.createOneInputTransformation(
ExecNodeUtil.createOneInputTransformation(
inputTransform,
getRelDetailedDescription,
operator,
InternalTypeInfo.of(outputType),
inputTransform.getParallelism)
inputTransform.getParallelism,
0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, ExpandCodeGenerator}
import org.apache.flink.table.planner.delegation.BatchPlanner
import org.apache.flink.table.planner.plan.nodes.calcite.Expand
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil
import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecEdge, ExecNode}
import org.apache.flink.table.planner.plan.utils.RelExplainUtil
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
Expand Down Expand Up @@ -98,11 +99,12 @@ class BatchExecExpand(
projects,
opName = "BatchExpand")

ExecNode.createOneInputTransformation(
ExecNodeUtil.createOneInputTransformation(
inputTransform,
getRelDetailedDescription,
operator,
InternalTypeInfo.of(outputType),
inputTransform.getParallelism)
inputTransform.getParallelism,
0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.flink.table.planner.codegen.agg.batch.{AggWithoutKeysCodeGener
import org.apache.flink.table.planner.delegation.BatchPlanner
import org.apache.flink.table.planner.plan.cost.FlinkCost._
import org.apache.flink.table.planner.plan.cost.FlinkCostFactory
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil
import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode}
import org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToBatchAggregateInfoList
import org.apache.flink.table.planner.plan.utils.FlinkRelMdUtil
Expand Down Expand Up @@ -140,7 +141,7 @@ abstract class BatchExecHashAggregateBase(
).genWithKeys()
}
val operator = new CodeGenOperatorFactory[RowData](generatedOperator)
ExecNode.createOneInputTransformation(
ExecNodeUtil.createOneInputTransformation(
input,
getRelDetailedDescription,
operator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.flink.table.planner.delegation.BatchPlanner
import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef}
import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil
import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
import org.apache.flink.table.planner.plan.utils.{FlinkRelMdUtil, JoinUtil}
import org.apache.flink.table.runtime.operators.join.{HashJoinOperator, HashJoinType}
Expand Down Expand Up @@ -279,7 +280,7 @@ class BatchExecHashJoin(

val managedMemory = MemorySize.parse(config.getConfiguration.getString(
ExecutionConfigOptions.TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY)).getBytes
ExecNode.createTwoInputTransformation(
ExecNodeUtil.createTwoInputTransformation(
build,
probe,
getRelDetailedDescription,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.flink.table.planner.codegen.agg.batch.{HashWindowCodeGenerator
import org.apache.flink.table.planner.delegation.BatchPlanner
import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.planner.plan.logical.LogicalWindow
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil
import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode}
import org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToBatchAggregateInfoList
import org.apache.flink.table.planner.plan.utils.FlinkRelMdUtil
Expand Down Expand Up @@ -142,7 +143,7 @@ abstract class BatchExecHashWindowAggregateBase(

val managedMemory = MemorySize.parse(config.getConfiguration.getString(
ExecutionConfigOptions.TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY)).getBytes
ExecNode.createOneInputTransformation(
ExecNodeUtil.createOneInputTransformation(
input,
getRelDetailedDescription,
operator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.flink.table.planner.codegen.SinkCodeGenerator._
import org.apache.flink.table.planner.codegen.{CodeGenUtils, CodeGeneratorContext}
import org.apache.flink.table.planner.delegation.BatchPlanner
import org.apache.flink.table.planner.plan.nodes.calcite.LegacySink
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil
import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecEdge, ExecNode}
import org.apache.flink.table.planner.plan.utils.UpdatingPlanChecker
import org.apache.flink.table.planner.sinks.DataStreamTableSink
Expand Down Expand Up @@ -138,12 +139,13 @@ class BatchExecLegacySink[T](
withChangeFlag,
"SinkConversion"
)
ExecNode.createOneInputTransformation(
ExecNodeUtil.createOneInputTransformation(
plan,
s"SinkConversionTo${resultDataType.getConversionClass.getSimpleName}",
converterOperator,
outputTypeInfo,
plan.getParallelism)
plan.getParallelism,
0)
}
case _ =>
throw new TableException("Cannot generate BoundedStream due to an invalid logical plan. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ class BatchExecLegacyTableSourceScan(
// to read multiple partitions which are multiple paths.
// We can use InputFormatSourceFunction directly to support InputFormat.
val func = new InputFormatSourceFunction[IN](format, t)
ExecNode.setManagedMemoryWeight(env.addSource(func, tableSource.explainSource(), t)
.getTransformation)
env.addSource(func, tableSource.explainSource(), t).getTransformation
}

private def computeIndexMapping()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.flink.table.data.RowData
import org.apache.flink.table.planner.delegation.BatchPlanner
import org.apache.flink.table.planner.plan.cost.FlinkCost._
import org.apache.flink.table.planner.plan.cost.FlinkCostFactory
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil
import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecEdge, ExecNode}
import org.apache.flink.table.planner.plan.utils.RelExplainUtil.fetchToString
import org.apache.flink.table.planner.plan.utils.SortUtil
Expand Down Expand Up @@ -106,11 +107,12 @@ class BatchExecLimit(
.asInstanceOf[Transformation[RowData]]
val inputType = input.getOutputType
val operator = new LimitOperator(isGlobal, limitStart, limitEnd)
ExecNode.createOneInputTransformation(
ExecNodeUtil.createOneInputTransformation(
input,
getRelDetailedDescription,
SimpleOperatorFactory.of(operator),
inputType,
input.getParallelism)
input.getParallelism,
0)
}
}
Loading

0 comments on commit af807df

Please sign in to comment.