Skip to content

Commit

Permalink
[FLINK-21245][table-planner-blink] Support StreamExecCalc json serial…
Browse files Browse the repository at this point in the history
…ization/deserialization

This closes apache#14878
  • Loading branch information
godfreyhe committed Mar 5, 2021
1 parent 9db31b5 commit b9e02de
Show file tree
Hide file tree
Showing 21 changed files with 1,165 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,13 @@ public List<InputProperty> getInputProperties() {
return inputProperties;
}

@JsonIgnore
@Override
public List<ExecEdge> getInputEdges() {
return checkNotNull(
inputEdges,
"inputEdges should not null, please call `setInputEdges(List<ExecEdge>)` first.");
}

@JsonIgnore
@Override
public void setInputEdges(List<ExecEdge> inputEdges) {
checkNotNull(inputEdges, "inputEdges should not be null.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,29 @@
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.types.logical.RowType;

import org.apache.calcite.rex.RexProgram;
import org.apache.calcite.rex.RexNode;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.List;

/** Batch {@link ExecNode} for Calc. */
public class BatchExecCalc extends CommonExecCalc implements BatchExecNode<RowData> {

public BatchExecCalc(
RexProgram calcProgram,
List<RexNode> projection,
@Nullable RexNode condition,
InputProperty inputProperty,
RowType outputType,
String description) {
super(
calcProgram,
projection,
condition,
TableStreamOperator.class,
false, // retainHeader
inputProperty,
getNewNodeId(),
Collections.singletonList(inputProperty),
outputType,
description);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,49 @@
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexProgram;

import java.util.Collections;
import javax.annotation.Nullable;

import java.util.List;
import java.util.Optional;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/** Base class for exec Calc. */
@JsonIgnoreProperties(ignoreUnknown = true)
public abstract class CommonExecCalc extends ExecNodeBase<RowData> {
private final RexProgram calcProgram;
private final Class<?> operatorBaseClass;
private final boolean retainHeader;
public static final String FIELD_NAME_PROJECTION = "projection";
public static final String FIELD_NAME_CONDITION = "condition";

@JsonProperty(FIELD_NAME_PROJECTION)
private final List<RexNode> projection;

public CommonExecCalc(
RexProgram calcProgram,
@JsonProperty(FIELD_NAME_CONDITION)
private final @Nullable RexNode condition;

@JsonIgnore private final Class<?> operatorBaseClass;
@JsonIgnore private final boolean retainHeader;

protected CommonExecCalc(
List<RexNode> projection,
@Nullable RexNode condition,
Class<?> operatorBaseClass,
boolean retainHeader,
InputProperty inputProperty,
int id,
List<InputProperty> inputProperties,
RowType outputType,
String description) {
super(Collections.singletonList(inputProperty), outputType, description);
this.calcProgram = calcProgram;
this.operatorBaseClass = operatorBaseClass;
super(id, inputProperties, outputType, description);
checkArgument(inputProperties.size() == 1);
this.projection = checkNotNull(projection);
this.condition = condition;
this.operatorBaseClass = checkNotNull(operatorBaseClass);
this.retainHeader = retainHeader;
}

Expand All @@ -67,20 +88,13 @@ protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
new CodeGeneratorContext(planner.getTableConfig())
.setOperatorBaseClass(operatorBaseClass);

final Optional<RexNode> condition;
if (calcProgram.getCondition() != null) {
condition = Optional.of(calcProgram.expandLocalRef(calcProgram.getCondition()));
} else {
condition = Optional.empty();
}

final CodeGenOperatorFactory<RowData> substituteStreamOperator =
CalcCodeGenerator.generateCalcOperator(
ctx,
inputTransform,
(RowType) getOutputType(),
calcProgram,
JavaScalaConversionUtil.toScala(condition),
JavaScalaConversionUtil.toScala(projection),
JavaScalaConversionUtil.toScala(Optional.ofNullable(this.condition)),
retainHeader,
getClass().getSimpleName());
final Transformation<RowData> transformation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,8 @@ protected abstract Transformation<RowData> createInputFormatTransformation(
InputFormat<RowData, ?> inputFormat,
InternalTypeInfo<RowData> outputTypeInfo,
String name);

public DynamicTableSourceSpec getTableSourceSpec() {
return tableSourceSpec;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,19 @@
import org.apache.flink.streaming.api.transformations.ShuffleMode;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor;
import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitorImpl;
import org.apache.flink.table.planner.plan.utils.ReflectionsUtil;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
Expand All @@ -41,9 +46,13 @@
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.jsontype.NamedType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;

import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -93,13 +102,25 @@ public static ExecNodeGraph generateExecNodeGraph(String jsonPlan, SerdeContext
}

private static void registerSerializers(SimpleModule module) {
// ObjectIdentifierJsonSerializer is also needed for LogicalType serialization
// ObjectIdentifierJsonSerializer is needed for LogicalType serialization
module.addSerializer(new ObjectIdentifierJsonSerializer());
// LogicalTypeJsonSerializer is needed for RelDataType serialization
module.addSerializer(new LogicalTypeJsonSerializer());
// RelDataTypeJsonSerializer is needed for RexNode serialization
module.addSerializer(new RelDataTypeJsonSerializer());
// RexNode is used in many exec nodes, so we register its serializer directly here
module.addSerializer(new RexNodeJsonSerializer());
}

private static void registerDeserializers(SimpleModule module) {
// ObjectIdentifierJsonDeserializer is also needed for LogicalType deserialization
// ObjectIdentifierJsonDeserializer is needed for LogicalType deserialization
module.addDeserializer(ObjectIdentifier.class, new ObjectIdentifierJsonDeserializer());
// LogicalTypeJsonSerializer is needed for RelDataType serialization
module.addDeserializer(LogicalType.class, new LogicalTypeJsonDeserializer());
// RelDataTypeJsonSerializer is needed for RexNode serialization
module.addDeserializer(RelDataType.class, new RelDataTypeJsonDeserializer());
// RexNode is used in many exec nodes, so we register its deserializer directly here
module.addDeserializer(RexNode.class, new RexNodeJsonDeserializer());
}

/** Check whether the given {@link ExecNodeGraph} is completely legal. */
Expand All @@ -120,11 +141,9 @@ protected void visitNode(ExecNode<?> node) {
// to support serializing/deserializing the push-downs.
if (node instanceof StreamExecTableSourceScan) {
String description = node.getDescription();
if (description.contains("project=[")) {
throw new TableException(
"DynamicTableSource with project push-down is not supported for JSON serialization now.");
}
if (description.contains("filter=[")) {
if (description.contains("filter=[")
// filter=[] means push-down nothing
&& !description.contains("filter=[]")) {
throw new TableException(
"DynamicTableSource with filter push-down is not supported for JSON serialization now.");
}
Expand Down Expand Up @@ -228,6 +247,9 @@ public ExecNodeGraph convertToExecNodeGraph() {
"The id: %s is not unique for ExecNode: %s.\nplease check it.",
id, execNode.getDescription()));
}
if (execNode instanceof StreamExecTableSourceScan) {
applyProjectionPushDown((StreamExecTableSourceScan) execNode);
}
idToExecNodes.put(id, execNode);
}
Map<Integer, List<ExecEdge>> idToInputEdges = new HashMap<>();
Expand Down Expand Up @@ -347,4 +369,36 @@ public static JsonPlanEdge fromExecEdge(ExecEdge execEdge) {
execEdge.getShuffleMode());
}
}

// TODO this is a temporary solution, we will introduce new interface later
// to support serializing/deserializing the push-downs.
private static void applyProjectionPushDown(StreamExecTableSourceScan scan) {
if (!scan.getDescription().contains("project=[")) {
return;
}
DynamicTableSourceSpec spec = scan.getTableSourceSpec();
ScanTableSource tableSource = spec.getScanTableSource();
if (!(tableSource instanceof SupportsProjectionPushDown)) {
// sanity check
return;
}
SupportsProjectionPushDown projectionPushDown = (SupportsProjectionPushDown) tableSource;
if (projectionPushDown.supportsNestedProjection()) {
throw new TableException(
"DynamicTableSource with nested project push-down is not supported for JSON serialization now.");
}

RowType outputType = (RowType) scan.getOutputType();
List<String> originFieldNames =
Arrays.asList(spec.getCatalogTable().getSchema().getFieldNames());
if (outputType.getFieldCount() == originFieldNames.size()) {
// sanity check
return;
}
int[][] projection = new int[outputType.getFieldCount()][1];
for (int i = 0; i < outputType.getFieldCount(); ++i) {
projection[i] = new int[] {originFieldNames.indexOf(outputType.getFieldNames().get(i))};
}
projectionPushDown.applyProjection(projection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,51 @@
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.types.logical.RowType;

import org.apache.calcite.rex.RexProgram;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import org.apache.calcite.rex.RexNode;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.List;

/** Stream {@link ExecNode} for Calc. */
@JsonIgnoreProperties(ignoreUnknown = true)
public class StreamExecCalc extends CommonExecCalc implements StreamExecNode<RowData> {

public StreamExecCalc(
RexProgram calcProgram,
List<RexNode> projection,
@Nullable RexNode condition,
InputProperty inputProperty,
RowType outputType,
String description) {
this(
projection,
condition,
getNewNodeId(),
Collections.singletonList(inputProperty),
outputType,
description);
}

@JsonCreator
public StreamExecCalc(
@JsonProperty(FIELD_NAME_PROJECTION) List<RexNode> projection,
@JsonProperty(FIELD_NAME_CONDITION) @Nullable RexNode condition,
@JsonProperty(FIELD_NAME_ID) int id,
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
@JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
@JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
super(
calcProgram,
projection,
condition,
TableStreamOperator.class,
true, // retainHeader
inputProperty,
id,
inputProperties,
outputType,
description);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object CalcCodeGenerator {
ctx: CodeGeneratorContext,
inputTransform: Transformation[RowData],
outputType: RowType,
calcProgram: RexProgram,
projection: Seq[RexNode],
condition: Option[RexNode],
retainHeader: Boolean = false,
opName: String): CodeGenOperatorFactory[RowData] = {
Expand All @@ -52,7 +52,7 @@ object CalcCodeGenerator {
inputType,
outputType,
classOf[BoxedWrapperRowData],
calcProgram,
projection,
condition,
eagerInputUnboxingCode = true,
retainHeader = retainHeader,
Expand All @@ -75,7 +75,7 @@ object CalcCodeGenerator {
name: String,
returnType: RowType,
outRowClass: Class[_ <: RowData],
calcProjection: RexProgram,
calcProjection: Seq[RexNode],
calcCondition: Option[RexNode],
config: TableConfig): GeneratedFunction[FlatMapFunction[RowData, RowData]] = {
val ctx = CodeGeneratorContext(config)
Expand Down Expand Up @@ -109,7 +109,7 @@ object CalcCodeGenerator {
inputType: RowType,
outRowType: RowType,
outRowClass: Class[_ <: RowData],
calcProgram: RexProgram,
projection: Seq[RexNode],
condition: Option[RexNode],
inputTerm: String = CodeGenUtils.DEFAULT_INPUT1_TERM,
collectorTerm: String = CodeGenUtils.DEFAULT_OPERATOR_COLLECTOR_TERM,
Expand All @@ -118,8 +118,6 @@ object CalcCodeGenerator {
outputDirectly: Boolean = false,
allowSplit: Boolean = false): String = {

val projection = calcProgram.getProjectList.map(calcProgram.expandLocalRef)

// according to the SQL standard, every table function should also be a scalar function
// but we don't allow that for now
projection.foreach(_.accept(ScalarFunctionsValidator))
Expand All @@ -133,13 +131,13 @@ object CalcCodeGenerator {
rexNode.isInstanceOf[RexInputRef] && rexNode.asInstanceOf[RexInputRef].getIndex == index
}

def produceOutputCode(resultTerm: String) = if (outputDirectly) {
def produceOutputCode(resultTerm: String): String = if (outputDirectly) {
s"$collectorTerm.collect($resultTerm);"
} else {
s"${OperatorCodeGenerator.generateCollect(resultTerm)}"
}

def produceProjectionCode = {
def produceProjectionCode: String = {
val projectionExprs = projection.map(exprGenerator.generateExpression)
val projectionExpression = exprGenerator.generateResultExpression(
projectionExprs,
Expand Down
Loading

0 comments on commit b9e02de

Please sign in to comment.