Skip to content

Commit

Permalink
[hotfix][table-common] Move specific input type strategies out of Inp…
Browse files Browse the repository at this point in the history
…utTypeStrategies
  • Loading branch information
twalthr committed Jun 17, 2021
1 parent 5bae5b5 commit c60c036
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.inference.InputTypeStrategies;
import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies;
import org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
Expand All @@ -47,9 +48,6 @@
import static org.apache.flink.table.types.inference.InputTypeStrategies.LITERAL;
import static org.apache.flink.table.types.inference.InputTypeStrategies.NO_ARGS;
import static org.apache.flink.table.types.inference.InputTypeStrategies.OUTPUT_IF_NULL;
import static org.apache.flink.table.types.inference.InputTypeStrategies.SPECIFIC_FOR_CAST;
import static org.apache.flink.table.types.inference.InputTypeStrategies.TWO_EQUALS_COMPARABLE;
import static org.apache.flink.table.types.inference.InputTypeStrategies.TWO_FULLY_COMPARABLE;
import static org.apache.flink.table.types.inference.InputTypeStrategies.and;
import static org.apache.flink.table.types.inference.InputTypeStrategies.commonType;
import static org.apache.flink.table.types.inference.InputTypeStrategies.comparable;
Expand All @@ -66,6 +64,8 @@
import static org.apache.flink.table.types.inference.TypeStrategies.matchFamily;
import static org.apache.flink.table.types.inference.TypeStrategies.nullable;
import static org.apache.flink.table.types.inference.TypeStrategies.varyingString;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TWO_EQUALS_COMPARABLE;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TWO_FULLY_COMPARABLE;

/** Dictionary of function definitions for all built-in functions. */
@PublicEvolving
Expand Down Expand Up @@ -1214,7 +1214,7 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
BuiltInFunctionDefinition.newBuilder()
.name("array")
.kind(SCALAR)
.inputTypeStrategy(InputTypeStrategies.SPECIFIC_FOR_ARRAY)
.inputTypeStrategy(SpecificInputTypeStrategies.ARRAY)
.outputTypeStrategy(SpecificTypeStrategies.ARRAY)
.build();

Expand All @@ -1229,7 +1229,7 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
BuiltInFunctionDefinition.newBuilder()
.name("map")
.kind(SCALAR)
.inputTypeStrategy(InputTypeStrategies.SPECIFIC_FOR_MAP)
.inputTypeStrategy(SpecificInputTypeStrategies.MAP)
.outputTypeStrategy(SpecificTypeStrategies.MAP)
.build();

Expand Down Expand Up @@ -1394,7 +1394,7 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
BuiltInFunctionDefinition.newBuilder()
.name("CURRENT_WATERMARK")
.kind(SCALAR)
.inputTypeStrategy(InputTypeStrategies.SPECIFIC_FOR_CURRENT_WATERMARK)
.inputTypeStrategy(SpecificInputTypeStrategies.CURRENT_WATERMARK)
.outputTypeStrategy(SpecificTypeStrategies.CURRENT_WATERMARK)
.notDeterministic()
.runtimeProvided()
Expand Down Expand Up @@ -1472,7 +1472,7 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
BuiltInFunctionDefinition.newBuilder()
.name("cast")
.kind(SCALAR)
.inputTypeStrategy(SPECIFIC_FOR_CAST)
.inputTypeStrategy(SpecificInputTypeStrategies.CAST)
.outputTypeStrategy(
nullable(ConstantArgumentCount.to(0), TypeStrategies.argument(1)))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,17 @@
package org.apache.flink.table.types.inference;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.strategies.AndArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.AnyArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.CastInputTypeStrategy;
import org.apache.flink.table.types.inference.strategies.CommonArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.CommonInputTypeStrategy;
import org.apache.flink.table.types.inference.strategies.ComparableTypeStrategy;
import org.apache.flink.table.types.inference.strategies.CompositeArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.ConstraintArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.CurrentWatermarkInputTypeStrategy;
import org.apache.flink.table.types.inference.strategies.ExplicitArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.FamilyArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.LiteralArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.MapInputTypeStrategy;
import org.apache.flink.table.types.inference.strategies.OrArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.OrInputTypeStrategy;
import org.apache.flink.table.types.inference.strategies.OutputArgumentTypeStrategy;
Expand Down Expand Up @@ -302,47 +298,6 @@ public static InputTypeStrategy commonType(int count) {
return new CommonInputTypeStrategy(ConstantArgumentCount.of(count));
}

// --------------------------------------------------------------------------------------------
// Specific input type strategies
// --------------------------------------------------------------------------------------------

/** Strategy specific for {@link BuiltInFunctionDefinitions#CAST}. */
public static final InputTypeStrategy SPECIFIC_FOR_CAST = new CastInputTypeStrategy();

/**
* Strategy specific for {@link BuiltInFunctionDefinitions#ARRAY}.
*
* <p>It expects at least one argument. All the arguments must have a common super type.
*/
public static final InputTypeStrategy SPECIFIC_FOR_ARRAY =
new CommonInputTypeStrategy(ConstantArgumentCount.from(1));

/**
* Strategy specific for {@link BuiltInFunctionDefinitions#MAP}.
*
* <p>It expects at least two arguments. There must be even number of arguments. All the keys
* and values must have a common super type respectively.
*/
public static final InputTypeStrategy SPECIFIC_FOR_MAP = new MapInputTypeStrategy();

/**
* Strategy that checks all types are fully comparable with each other. Requires exactly two
* arguments.
*/
public static final InputTypeStrategy TWO_FULLY_COMPARABLE =
comparable(ConstantArgumentCount.of(2), StructuredComparision.FULL);

/**
* Strategy that checks all types are equals comparable with each other. Requires exactly two
* arguments.
*/
public static final InputTypeStrategy TWO_EQUALS_COMPARABLE =
comparable(ConstantArgumentCount.of(2), StructuredComparision.EQUALS);

/** Strategy specific for {@link BuiltInFunctionDefinitions#CURRENT_WATERMARK}. */
public static final InputTypeStrategy SPECIFIC_FOR_CURRENT_WATERMARK =
new CurrentWatermarkInputTypeStrategy();

// --------------------------------------------------------------------------------------------

private InputTypeStrategies() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
* second one. The second one must be a type literal.
*/
@Internal
public final class CastInputTypeStrategy implements InputTypeStrategy {
class CastInputTypeStrategy implements InputTypeStrategy {

@Override
public ArgumentCount getArgumentCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
* <p>It expects a single argument representing a rowtime attribute.
*/
@Internal
public class CurrentWatermarkInputTypeStrategy implements InputTypeStrategy {
class CurrentWatermarkInputTypeStrategy implements InputTypeStrategy {

@Override
public ArgumentCount getArgumentCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
* values must have a common super type respectively.
*/
@Internal
public final class MapInputTypeStrategy implements InputTypeStrategy {
class MapInputTypeStrategy implements InputTypeStrategy {

private static final ArgumentCount AT_LEAST_TWO_EVEN =
new ArgumentCount() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.types.inference.strategies;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.inference.InputTypeStrategies;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.logical.StructuredType;

import static org.apache.flink.table.types.inference.InputTypeStrategies.comparable;

/**
* Entry point for specific input type strategies not covered in {@link InputTypeStrategies}.
*
* <p>This primarily serves the purpose of reducing visibility of individual type strategy
* implementations to avoid polluting the API classpath.
*/
@Internal
public final class SpecificInputTypeStrategies {

/** See {@link CastInputTypeStrategy}. */
public static final InputTypeStrategy CAST = new CastInputTypeStrategy();

/** See {@link MapInputTypeStrategy}. */
public static final InputTypeStrategy MAP = new MapInputTypeStrategy();

/** See {@link CurrentWatermarkTypeStrategy}. */
public static final InputTypeStrategy CURRENT_WATERMARK =
new CurrentWatermarkInputTypeStrategy();

// --------------------------------------------------------------------------------------------
// Strategies composed of other strategies
// --------------------------------------------------------------------------------------------

/**
* Strategy specific for {@link BuiltInFunctionDefinitions#ARRAY}.
*
* <p>It expects at least one argument. All the arguments must have a common super type.
*/
public static final InputTypeStrategy ARRAY =
new CommonInputTypeStrategy(ConstantArgumentCount.from(1));

/**
* Strategy that checks all types are fully comparable with each other. Requires exactly two
* arguments.
*/
public static final InputTypeStrategy TWO_FULLY_COMPARABLE =
comparable(ConstantArgumentCount.of(2), StructuredType.StructuredComparision.FULL);

/**
* Strategy that checks all types are equals comparable with each other. Requires exactly two
* arguments.
*/
public static final InputTypeStrategy TWO_EQUALS_COMPARABLE =
comparable(ConstantArgumentCount.of(2), StructuredType.StructuredComparision.EQUALS);

private SpecificInputTypeStrategies() {
// no instantiation
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* implementations to avoid polluting the API classpath.
*/
@Internal
public class SpecificTypeStrategies {
public final class SpecificTypeStrategies {

/** See {@link RowTypeStrategy}. */
public static final TypeStrategy ROW = new RowTypeStrategy();
Expand Down
Loading

0 comments on commit c60c036

Please sign in to comment.