Skip to content

Commit

Permalink
[FLINK-21093][table-planner-blink] Support StreamExecTableSource json…
Browse files Browse the repository at this point in the history
… serialization/deserialization

This closes apache#14729
  • Loading branch information
godfreyhe committed Feb 3, 2021
1 parent e16e45f commit c37905e
Show file tree
Hide file tree
Showing 33 changed files with 2,138 additions and 54 deletions.
6 changes: 6 additions & 0 deletions flink-table/flink-table-planner-blink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ under the License.
<scope>test</scope>
</dependency>

<!-- utility to scan classpaths -->
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
Expand Down Expand Up @@ -418,6 +419,11 @@ under the License.
<shadedPattern>org.apache.flink.table.shaded.org.codehaus</shadedPattern>
</relocation>-->

<relocation>
<pattern>org.reflections</pattern>
<shadedPattern>org.apache.flink.table.shaded.org.reflections</shadedPattern>
</relocation>

</relocations>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,40 @@
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonDeserializer;
import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer;
import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor;
import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;

import java.util.List;

/**
* The representation of execution information for a {@link FlinkPhysicalRel}.
*
* @param <T> The type of the elements that result from this node.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "class")
public interface ExecNode<T> {

String FIELD_NAME_ID = "id";
String FIELD_NAME_DESCRIPTION = "description";
String FIELD_NAME_INPUT_PROPERTIES = "inputProperties";
String FIELD_NAME_OUTPUT_TYPE = "outputType";

/** Gets the ID of this node. */
@JsonProperty(value = FIELD_NAME_ID)
int getId();

/** Returns a string which describes this node. */
@JsonProperty(value = FIELD_NAME_DESCRIPTION)
String getDescription();

/**
Expand All @@ -46,6 +65,9 @@ public interface ExecNode<T> {
* to the JavaDoc of {@link RowData} for more info about mapping of logical types to internal
* data structures.
*/
@JsonProperty(value = FIELD_NAME_OUTPUT_TYPE)
@JsonSerialize(using = LogicalTypeJsonSerializer.class)
@JsonDeserialize(using = LogicalTypeJsonDeserializer.class)
LogicalType getOutputType();

/**
Expand All @@ -55,13 +77,15 @@ public interface ExecNode<T> {
*
* @return List of this node's input properties.
*/
@JsonProperty(value = FIELD_NAME_INPUT_PROPERTIES)
List<InputProperty> getInputProperties();

/**
* Returns a list of this node's input {@link ExecEdge}s.
*
* <p>NOTE: If there are no inputs, returns an empty list, not null.
*/
@JsonIgnore
List<ExecEdge> getInputEdges();

/**
Expand All @@ -71,6 +95,7 @@ public interface ExecNode<T> {
*
* @param inputEdges the input {@link ExecEdge}s.
*/
@JsonIgnore
void setInputEdges(List<ExecEdge> inputEdges);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor;
import org.apache.flink.table.types.logical.LogicalType;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;

import java.util.ArrayList;
import java.util.List;

Expand All @@ -36,22 +39,53 @@
*
* @param <T> The type of the elements that result from this node.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public abstract class ExecNodeBase<T> implements ExecNode<T> {

private final String description;
private final List<InputProperty> inputProperties;
private final LogicalType outputType;
private List<ExecEdge> inputEdges;
/** The unique identifier for each ExecNode in the json plan. */
@JsonIgnore private final int id;

@JsonIgnore private final String description;

@JsonIgnore private final LogicalType outputType;

@JsonIgnore private final List<InputProperty> inputProperties;

private transient Transformation<T> transformation;
@JsonIgnore private List<ExecEdge> inputEdges;

@JsonIgnore private transient Transformation<T> transformation;

/** This is used to assign a unique ID to every ExecNode. */
private static Integer idCounter = 0;

/** Generate an unique ID for ExecNode. */
public static int getNewNodeId() {
idCounter++;
return idCounter;
}

// used for json creator
protected ExecNodeBase(
List<InputProperty> inputProperties, LogicalType outputType, String description) {
int id,
List<InputProperty> inputProperties,
LogicalType outputType,
String description) {
this.id = id;
this.inputProperties = checkNotNull(inputProperties);
this.outputType = checkNotNull(outputType);
this.description = checkNotNull(description);
}

protected ExecNodeBase(
List<InputProperty> inputProperties, LogicalType outputType, String description) {
this(getNewNodeId(), inputProperties, outputType, description);
}

@Override
public final int getId() {
return id;
}

@Override
public String getDescription() {
return description;
Expand All @@ -67,13 +101,15 @@ public List<InputProperty> getInputProperties() {
return inputProperties;
}

@JsonIgnore
@Override
public List<ExecEdge> getInputEdges() {
return checkNotNull(
inputEdges,
"inputEdges should not null, please call `setInputEdges(List<ExecEdge>)` first.");
}

@JsonIgnore
@Override
public void setInputEdges(List<ExecEdge> inputEdges) {
checkNotNull(inputEdges, "inputEdges should not be null.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,18 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.table.planner.plan.nodes.exec.serde.RequiredDistributionJsonDeserializer;
import org.apache.flink.table.planner.plan.nodes.exec.serde.RequiredDistributionJsonSerializer;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;

import java.util.Arrays;
import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -32,17 +42,53 @@
* <p>The input concept is not corresponding to the execution edge, but rather to the {@link Input}.
*/
@Internal
@JsonIgnoreProperties(ignoreUnknown = true)
public class InputProperty {

/** The input does not require any specific data distribution. */
public static final RequiredDistribution ANY_DISTRIBUTION =
new RequiredDistribution(DistributionType.ANY) {};

/**
* The input will read all records for each parallelism of the target node. All records appear
* in each parallelism.
*/
public static final RequiredDistribution BROADCAST_DISTRIBUTION =
new RequiredDistribution(DistributionType.BROADCAST) {};

/** The input will read all records, and the parallelism of the target node must be 1. */
public static final RequiredDistribution SINGLETON_DISTRIBUTION =
new RequiredDistribution(DistributionType.SINGLETON) {};

/**
* Returns a place-holder required distribution.
*
* <p>Currently {@link InputProperty} is only used for deadlock breakup and multi-input in batch
* mode, so for {@link ExecNode}s not affecting the algorithm we use this place-holder.
*
* <p>We should fill out the detailed {@link InputProperty} for each sub-class of {@link
* ExecNode} in the future.
*/
public static final RequiredDistribution UNKNOWN_DISTRIBUTION =
new RequiredDistribution(DistributionType.UNKNOWN) {};

public static final InputProperty DEFAULT = InputProperty.builder().build();

public static final String FIELD_NAME_REQUIRED_DISTRIBUTION = "requiredDistribution";
public static final String FIELD_NAME_DAM_BEHAVIOR = "damBehavior";
public static final String FIELD_NAME_PRIORITY = "priority";

/**
* The required input data distribution when the target {@link ExecNode} read data in from the
* corresponding input.
*/
@JsonProperty(FIELD_NAME_REQUIRED_DISTRIBUTION)
@JsonSerialize(using = RequiredDistributionJsonSerializer.class)
@JsonDeserialize(using = RequiredDistributionJsonDeserializer.class)
private final RequiredDistribution requiredDistribution;

/** How does the input record trigger the output behavior of the target {@link ExecNode}. */
@JsonProperty(FIELD_NAME_DAM_BEHAVIOR)
private final DamBehavior damBehavior;

/**
Expand All @@ -51,27 +97,54 @@ public class InputProperty {
* <p>The smaller the integer, the higher the priority. Same integer indicates the same
* priority.
*/
@JsonProperty(FIELD_NAME_PRIORITY)
private final int priority;

private InputProperty(
RequiredDistribution requiredDistribution, DamBehavior damBehavior, int priority) {
this.requiredDistribution = requiredDistribution;
this.damBehavior = damBehavior;
@JsonCreator
public InputProperty(
@JsonProperty(FIELD_NAME_REQUIRED_DISTRIBUTION)
RequiredDistribution requiredDistribution,
@JsonProperty(FIELD_NAME_DAM_BEHAVIOR) DamBehavior damBehavior,
@JsonProperty(FIELD_NAME_PRIORITY) int priority) {
this.requiredDistribution = checkNotNull(requiredDistribution);
this.damBehavior = checkNotNull(damBehavior);
this.priority = priority;
}

@JsonIgnore
public RequiredDistribution getRequiredDistribution() {
return requiredDistribution;
}

@JsonIgnore
public DamBehavior getDamBehavior() {
return damBehavior;
}

@JsonIgnore
public int getPriority() {
return priority;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
InputProperty inputProperty = (InputProperty) o;
return priority == inputProperty.priority
&& requiredDistribution.equals(inputProperty.requiredDistribution)
&& damBehavior == inputProperty.damBehavior;
}

@Override
public int hashCode() {
return Objects.hash(requiredDistribution, damBehavior, priority);
}

@Override
public String toString() {
return "InputProperty{"
Expand Down Expand Up @@ -132,6 +205,23 @@ public DistributionType getType() {
return type;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
RequiredDistribution that = (RequiredDistribution) o;
return type == that.type;
}

@Override
public int hashCode() {
return Objects.hash(type);
}

@Override
public String toString() {
return type.name();
Expand All @@ -155,39 +245,34 @@ public int[] getKeys() {
return keys;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
HashDistribution that = (HashDistribution) o;
return Arrays.equals(keys, that.keys);
}

@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + Arrays.hashCode(keys);
return result;
}

@Override
public String toString() {
return "HASH" + Arrays.toString(keys);
}
}

/** The input does not require any specific data distribution. */
public static final RequiredDistribution ANY_DISTRIBUTION =
new RequiredDistribution(DistributionType.ANY) {};

/**
* The input will read all records for each parallelism of the target node. All records appear
* in each parallelism.
*/
public static final RequiredDistribution BROADCAST_DISTRIBUTION =
new RequiredDistribution(DistributionType.BROADCAST) {};

/** The input will read all records, and the parallelism of the target node must be 1. */
public static final RequiredDistribution SINGLETON_DISTRIBUTION =
new RequiredDistribution(DistributionType.SINGLETON) {};

/**
* Returns a place-holder required distribution.
*
* <p>Currently {@link InputProperty} is only used for deadlock breakup and multi-input in batch
* mode, so for {@link ExecNode}s not affecting the algorithm we use this place-holder.
*
* <p>We should fill out the detailed {@link InputProperty} for each sub-class of {@link
* ExecNode} in the future.
*/
public static final RequiredDistribution UNKNOWN_DISTRIBUTION =
new RequiredDistribution(DistributionType.UNKNOWN) {};

/**
* The input will read the records whose keys hash to a particular hash value.
*
Expand Down
Loading

0 comments on commit c37905e

Please sign in to comment.