diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java index 1ed86b3c841bc..827bc7777dfbd 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java +++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java @@ -27,8 +27,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; /** - * Wordcount for placing at least something into the jar file. - * + * WordCount for placing at least something into the jar file. */ public class WordCount { @@ -98,7 +97,7 @@ public void flatMap(String value, Collector> out) { // ************************************************************************* private static boolean fileOutput = false; - private static boolean verbose = false; + private static String textPath; private static String outputPath; @@ -111,7 +110,7 @@ private static boolean parseParameters(String[] args) { textPath = args[0]; outputPath = args[1]; } else if(args.length == 4 && (args[0].startsWith("-v") || args[0].startsWith("--verbose"))) { // cli line: program {optArg} {optVal} {textPath} {outputPath} - verbose = Boolean.valueOf(args[1]); + Boolean.valueOf(args[1]); // parse verbosity flag textPath = args[2]; outputPath = args[3]; } else { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java index 26ed788fb29b4..e9aa358b8716b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java @@ -56,8 +56,7 @@ public class DualInputSemanticProperties extends SemanticProperties { public DualInputSemanticProperties() { - super(); - this.init(); + init(); } /** @@ -251,15 +250,24 @@ public FieldSet getReadFields2() { */ @Override public void clearProperties() { - this.init(); super.clearProperties(); + init(); } + @Override + public boolean isEmpty() { + return super.isEmpty() && + (forwardedFields1 == null || forwardedFields1.isEmpty()) && + (forwardedFields2 == null || forwardedFields2.isEmpty()) && + (readFields1 == null || readFields1.size() == 0) && + (readFields2 == null || readFields2.size() == 0); + } + + private void init() { this.forwardedFields1 = new HashMap(); this.forwardedFields2 = new HashMap(); this.readFields1 = null; this.readFields2 = null; } - } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java index 7e1054dd9e163..ba801ec67ce1f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java @@ -30,11 +30,10 @@ public abstract class SemanticProperties implements Serializable { private static final long serialVersionUID = 1L; - /** - * Set of fields that are written in the destination record(s). - */ + /** Set of fields that are written in the destination record(s).*/ private FieldSet writtenFields; + /** * Adds, to the existing information, field(s) that are written in * the destination record(s). @@ -71,10 +70,10 @@ public FieldSet getWrittenFields() { * Clears the object. */ public void clearProperties() { - this.init(); + this.writtenFields = null; } - private void init() { - this.writtenFields = null; + public boolean isEmpty() { + return this.writtenFields == null || this.writtenFields.size() == 0; } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java index 45f020ac45b24..77ed1bc2dac2b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java @@ -27,23 +27,18 @@ * Container for the semantic properties associated to a single input operator. */ public class SingleInputSemanticProperties extends SemanticProperties { + private static final long serialVersionUID = 1L; - /** - * Mapping from fields in the source record(s) to fields in the destination - * record(s). - */ + /**Mapping from fields in the source record(s) to fields in the destination record(s). */ private Map forwardedFields; - /** - * Set of fields that are read in the source record(s). - */ + /** Set of fields that are read in the source record(s).*/ private FieldSet readFields; public SingleInputSemanticProperties() { - super(); - this.init(); + init(); } /** @@ -140,8 +135,15 @@ public FieldSet getReadFields() { */ @Override public void clearProperties() { - this.init(); super.clearProperties(); + init(); + } + + @Override + public boolean isEmpty() { + return super.isEmpty() && + (forwardedFields == null || forwardedFields.isEmpty()) && + (readFields == null || readFields.size() == 0); } private void init() { @@ -206,5 +208,10 @@ public void addWrittenFields(FieldSet writtenFields) { public void setWrittenFields(FieldSet writtenFields) { throw new UnsupportedOperationException(); } + + @Override + public boolean isEmpty() { + return false; + } } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index 6415570dd61de..edd49e2cdcff2 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.common.functions.Partitioner; @@ -68,6 +69,7 @@ import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; +import org.apache.flink.api.java.typeutils.MissingTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.fs.FileSystem.WriteMode; @@ -91,19 +93,22 @@ public abstract class DataSet { private final ExecutionEnvironment context; - private final TypeInformation type; - - protected DataSet(ExecutionEnvironment context, TypeInformation type) { + // NOTE: the type must not be accessed directly, but only via getType() + private TypeInformation type; + + private boolean typeUsed = false; + + + protected DataSet(ExecutionEnvironment context, TypeInformation typeInfo) { if (context == null) { throw new NullPointerException("context is null"); } - - if (type == null) { - throw new NullPointerException("type is null"); + if (typeInfo == null) { + throw new NullPointerException("typeInfo is null"); } - + this.context = context; - this.type = type; + this.type = typeInfo; } /** @@ -117,6 +122,29 @@ public ExecutionEnvironment getExecutionEnvironment() { return this.context; } + // -------------------------------------------------------------------------------------------- + // Type Information handling + // -------------------------------------------------------------------------------------------- + + /** + * Tries to fill in the type information. Type information can be filled in later when the program uses + * a type hint. This method checks whether the type information has ever been accessed before and does not + * allow modifications if the type was accessed already. This ensures consistency by making sure different + * parts of the operation do not assume different type information. + * + * @param typeInfo The type information to fill in. + * + * @throws IllegalStateException Thrown, if the type information has been accessed before. + */ + protected void fillInType(TypeInformation typeInfo) { + if (typeUsed) { + throw new IllegalStateException("TypeInformation cannot be filled in for the type after it has been used. " + + "Please make sure that the type info hints are the first call after the transformation function, " + + "before any access to types or semantic properties, etc."); + } + this.type = typeInfo; + } + /** * Returns the {@link TypeInformation} for the type of this DataSet. * @@ -125,6 +153,15 @@ public ExecutionEnvironment getExecutionEnvironment() { * @see TypeInformation */ public TypeInformation getType() { + if (type instanceof MissingTypeInfo) { + MissingTypeInfo typeInfo = (MissingTypeInfo) type; + throw new InvalidTypesException("The return type of function '" + typeInfo.getFunctionName() + + "' could not be determined automatically, due to type erasure. " + + "You can give type information hints by using the returns(...) method on the result of " + + "the transformation call, or by letting your function implement the 'ResultTypeQueryable' " + + "interface.", typeInfo.getTypeException()); + } + typeUsed = true; return this.type; } @@ -141,25 +178,25 @@ public F clean(F f) { // -------------------------------------------------------------------------------------------- /** - * Applies a Map transformation on a {@link DataSet}.
- * The transformation calls a {@link org.apache.flink.api.common.functions.RichMapFunction} for each element of the DataSet. + * Applies a Map transformation on this DataSet.
+ * The transformation calls a {@link org.apache.flink.api.common.functions.MapFunction} for each element of the DataSet. * Each MapFunction call returns exactly one element. * * @param mapper The MapFunction that is called for each element of the DataSet. * @return A MapOperator that represents the transformed DataSet. * + * @see org.apache.flink.api.common.functions.MapFunction * @see org.apache.flink.api.common.functions.RichMapFunction * @see MapOperator - * @see DataSet */ public MapOperator map(MapFunction mapper) { if (mapper == null) { throw new NullPointerException("Map function must not be null."); } - TypeInformation resultType = TypeExtractor.getMapReturnTypes(mapper, this.getType()); - - return new MapOperator(this, resultType, clean(mapper), Utils.getCallLocationName()); + String callLocation = Utils.getCallLocationName(); + TypeInformation resultType = TypeExtractor.getMapReturnTypes(mapper, getType(), callLocation, true); + return new MapOperator(this, resultType, clean(mapper), callLocation); } @@ -180,14 +217,15 @@ public MapOperator map(MapFunction mapper) { * * @see MapPartitionFunction * @see MapPartitionOperator - * @see DataSet */ public MapPartitionOperator mapPartition(MapPartitionFunction mapPartition ){ if (mapPartition == null) { throw new NullPointerException("MapPartition function must not be null."); } - TypeInformation resultType = TypeExtractor.getMapPartitionReturnTypes(mapPartition, this.getType()); - return new MapPartitionOperator(this, resultType, clean(mapPartition), Utils.getCallLocationName()); + + String callLocation = Utils.getCallLocationName(); + TypeInformation resultType = TypeExtractor.getMapPartitionReturnTypes(mapPartition, getType(), callLocation, true); + return new MapPartitionOperator(this, resultType, clean(mapPartition), callLocation); } /** @@ -207,8 +245,9 @@ public FlatMapOperator flatMap(FlatMapFunction flatMapper) { throw new NullPointerException("FlatMap function must not be null."); } - TypeInformation resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, this.getType()); - return new FlatMapOperator(this, resultType, clean(flatMapper), Utils.getCallLocationName()); + String callLocation = Utils.getCallLocationName(); + TypeInformation resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType(), callLocation, true); + return new FlatMapOperator(this, resultType, clean(flatMapper), callLocation); } /** @@ -288,7 +327,7 @@ public AggregateOperator aggregate(Aggregations agg, int field) { * @see org.apache.flink.api.java.operators.AggregateOperator */ public AggregateOperator sum (int field) { - return this.aggregate (Aggregations.SUM, field); + return aggregate(Aggregations.SUM, field); } /** @@ -362,8 +401,10 @@ public GroupReduceOperator reduceGroup(GroupReduceFunction reduc if (reducer == null) { throw new NullPointerException("GroupReduce function must not be null."); } - TypeInformation resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, this.getType()); - return new GroupReduceOperator(this, resultType, clean(reducer), Utils.getCallLocationName()); + + String callLocation = Utils.getCallLocationName(); + TypeInformation resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, getType(), callLocation, true); + return new GroupReduceOperator(this, resultType, clean(reducer), callLocation); } /** @@ -394,12 +435,12 @@ public GroupReduceOperator reduceGroup(GroupReduceFunction reduc */ @SuppressWarnings({ "unchecked", "rawtypes" }) public ReduceOperator minBy(int... fields) { - if(!type.isTupleType()) { + if(!getType().isTupleType()) { throw new InvalidProgramException("DataSet#minBy(int...) only works on Tuple types."); } return new ReduceOperator(this, new SelectByMinFunction( - (TupleTypeInfo) type, fields), Utils.getCallLocationName()); + (TupleTypeInfo) getType(), fields), Utils.getCallLocationName()); } /** @@ -430,12 +471,12 @@ public ReduceOperator minBy(int... fields) { */ @SuppressWarnings({ "unchecked", "rawtypes" }) public ReduceOperator maxBy(int... fields) { - if(!type.isTupleType()) { + if(!getType().isTupleType()) { throw new InvalidProgramException("DataSet#maxBy(int...) only works on Tuple types."); } return new ReduceOperator(this, new SelectByMaxFunction( - (TupleTypeInfo) type, fields), Utils.getCallLocationName()); + (TupleTypeInfo) getType(), fields), Utils.getCallLocationName()); } /** @@ -466,7 +507,7 @@ public GroupReduceOperator first(int n) { * @return A DistinctOperator that represents the distinct DataSet. */ public DistinctOperator distinct(KeySelector keyExtractor) { - TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type); + TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, getType()); return new DistinctOperator(this, new Keys.SelectorFunctionKeys(keyExtractor, getType(), keyType), Utils.getCallLocationName()); } @@ -538,7 +579,7 @@ public DistinctOperator distinct() { * @see DataSet */ public UnsortedGrouping groupBy(KeySelector keyExtractor) { - TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type); + TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, getType()); return new UnsortedGrouping(this, new Keys.SelectorFunctionKeys(clean(keyExtractor), getType(), keyType)); } @@ -976,7 +1017,7 @@ public PartitionOperator partitionByHash(String... fields) { * @see KeySelector */ public > PartitionOperator partitionByHash(KeySelector keyExtractor) { - final TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type); + final TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, getType()); return new PartitionOperator(this, PartitionMethod.HASH, new Keys.SelectorFunctionKeys(clean(keyExtractor), this.getType(), keyType), Utils.getCallLocationName()); } @@ -1023,8 +1064,8 @@ public PartitionOperator partitionCustom(Partitioner partitioner, Stri * @see KeySelector */ public > PartitionOperator partitionCustom(Partitioner partitioner, KeySelector keyExtractor) { - final TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type); - return new PartitionOperator(this, new Keys.SelectorFunctionKeys(keyExtractor, this.getType(), keyType), clean(partitioner), Utils.getCallLocationName()); + final TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, getType()); + return new PartitionOperator(this, new Keys.SelectorFunctionKeys(keyExtractor, getType(), keyType), clean(partitioner), Utils.getCallLocationName()); } /** @@ -1076,7 +1117,7 @@ public DataSink writeAsText(String filePath, WriteMode writeMode) { return output(tof); } -/** + /** * Writes a DataSet as a text file to the specified location.
* For each element of the DataSet the result of {@link TextFormatter#format(Object)} is written. * @@ -1175,7 +1216,7 @@ public DataSink writeAsCsv(String filePath, String rowDelimiter, String field @SuppressWarnings("unchecked") private DataSink internalWriteAsCsv(Path filePath, String rowDelimiter, String fieldDelimiter, WriteMode wm) { - Validate.isTrue(this.type.isTupleType(), "The writeAsCsv() method can only be used on data sets of tuples."); + Validate.isTrue(getType().isTupleType(), "The writeAsCsv() method can only be used on data sets of tuples."); CsvOutputFormat of = new CsvOutputFormat(filePath, rowDelimiter, fieldDelimiter); if(wm != null) { of.setWriteMode(wm); @@ -1258,10 +1299,10 @@ public DataSink output(OutputFormat outputFormat) { // configure the type if needed if (outputFormat instanceof InputTypeConfigurable) { - ((InputTypeConfigurable) outputFormat).setInputType(this.type); + ((InputTypeConfigurable) outputFormat).setInputType(getType()); } - DataSink sink = new DataSink(this, outputFormat, this.type); + DataSink sink = new DataSink(this, outputFormat, getType()); this.context.registerDataSink(sink); return sink; } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java index edb1c74ef110a..25875a0e4c8b9 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java @@ -1,5 +1,4 @@ - -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java index fed402f5e3d48..b33139b766a61 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java @@ -122,8 +122,11 @@ public CoGroupOperator(DataSet input1, DataSet input2, Keys keys1, K this.keys1 = keys1; this.keys2 = keys2; - - extractSemanticAnnotationsFromUdf(function.getClass()); + } + + @Override + protected CoGroupFunction getFunction() { + return function; } protected Keys getKeys1() { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java index 078d91eb1a2cc..e2d225cfca2fd 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java @@ -51,6 +51,7 @@ public class CrossOperator extends TwoInputUdfOperator> { private final CrossFunction function; + private final String defaultName; public CrossOperator(DataSet input1, DataSet input2, @@ -62,17 +63,15 @@ public CrossOperator(DataSet input1, DataSet input2, this.function = function; this.defaultName = defaultName; - - if (!(function instanceof ProjectCrossFunction)) { - extractSemanticAnnotationsFromUdf(function.getClass()); - } else { - generateProjectionProperties(((ProjectCrossFunction) function)); - } } - - public void generateProjectionProperties(ProjectCrossFunction pcf) { - DualInputSemanticProperties props = SemanticPropUtil.createProjectionPropertiesDual(pcf.getFields(), pcf.getIsFromFirst()); - setSemanticProperties(props); + + @Override + protected CrossFunction getFunction() { + return function; + } + + private String getDefaultName() { + return defaultName; } @Override @@ -112,7 +111,7 @@ public static final class DefaultCross extends CrossOperator input2; public DefaultCross(DataSet input1, DataSet input2, String defaultName) { - super(input1, input2, (CrossFunction>) new DefaultCrossFunction(), + super(input1, input2, new DefaultCrossFunction(), new TupleTypeInfo>(input1.getType(), input2.getType()), defaultName); if (input1 == null || input2 == null) { @@ -137,7 +136,8 @@ public CrossOperator with(CrossFunction function) { if (function == null) { throw new NullPointerException("Cross function must not be null."); } - TypeInformation returnType = TypeExtractor.getCrossReturnTypes(function, input1.getType(), input2.getType()); + TypeInformation returnType = TypeExtractor.getCrossReturnTypes(function, input1.getType(), input2.getType(), + super.getDefaultName(), true); return new CrossOperator(input1, input2, clean(function), returnType, Utils.getCallLocationName()); } @@ -221,6 +221,11 @@ protected ProjectCross(DataSet input1, DataSet input2, int[] fields, boo this.crossProjection = crossProjection; } + @Override + protected ProjectCrossFunction getFunction() { + return (ProjectCrossFunction) super.getFunction(); + } + /** * Continues a ProjectCross transformation and adds fields of the first cross input to the projection.
* If the first cross input is a {@link Tuple} {@link DataSet}, fields can be selected by their index. @@ -277,10 +282,6 @@ public ProjectCross projectSecond(int... second /** * Deprecated method only kept for compatibility. - * - * @param types - * - * @return */ @SuppressWarnings({ "hiding", "unchecked" }) @Deprecated @@ -308,6 +309,12 @@ public CrossOperator withConstantSetFirst(String... constantSetFirs public CrossOperator withConstantSetSecond(String... constantSetSecond) { throw new InvalidProgramException("The semantic properties (constant fields and forwarded fields) are automatically calculated."); } + + @Override + protected DualInputSemanticProperties extractSemanticAnnotationsFromUdf(Class udfClass) { + // we do not extract anything, but construct the properties from the projection + return SemanticPropUtil.createProjectionPropertiesDual(getFunction().getFields(), getFunction().getIsFromFirst()); + } } public static final class ProjectCrossFunction implements CrossFunction { @@ -597,11 +604,9 @@ protected CrossProjection projectSecond(int... secondFieldIndexes) { // GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator. /** - * Chooses a projectTupleX according to the length of {@link CrossProjection#fieldIndexes} + * Chooses a projectTupleX according to the length of {@link org.apache.flink.api.java.operators.CrossOperator.CrossProjection#fieldIndexes} * * @return The projected DataSet. - * - * @see ProjectCross */ @SuppressWarnings("unchecked") public ProjectCross projectTupleX() { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java index fa2d4d6518d8f..d30dc5e20d249 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java @@ -57,8 +57,6 @@ public DistinctOperator(DataSet input, Keys keys, String distinctLocationN // if keys is null distinction is done on all tuple fields if (keys == null) { if (input.getType() instanceof CompositeType) { - - CompositeType cType = (CompositeType) input.getType(); keys = new Keys.ExpressionKeys(new String[] {Keys.ExpressionKeys.SELECT_ALL_CHAR }, input.getType()); } else { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java index 1d93b0afd1572..56bea50b16580 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java @@ -41,7 +41,11 @@ public FilterOperator(DataSet input, FilterFunction function, String defau this.function = function; this.defaultName = defaultName; - extractSemanticAnnotationsFromUdf(function.getClass()); + } + + @Override + protected FilterFunction getFunction() { + return function; } @Override @@ -51,17 +55,17 @@ protected org.apache.flink.api.common.operators.base.FilterOperatorBase po = new PlanFilterOperator(function, name, getInputType()); - // set input po.setInput(input); + // set dop - if(this.getParallelism() > 0) { + if (getParallelism() > 0) { // use specified dop - po.setDegreeOfParallelism(this.getParallelism()); + po.setDegreeOfParallelism(getParallelism()); } else { // if no dop has been specified, use dop of input operator to enable chaining po.setDegreeOfParallelism(input.getDegreeOfParallelism()); } - + return po; } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java index 73d6095151b81..47446dd1f74bd 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java @@ -43,7 +43,11 @@ public FlatMapOperator(DataSet input, TypeInformation resultType, FlatM this.function = function; this.defaultName = defaultName; - extractSemanticAnnotationsFromUdf(function.getClass()); + } + + @Override + protected FlatMapFunction getFunction() { + return function; } @Override diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java index bef91ede1c07d..0e2fb5e27af1a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java @@ -82,8 +82,6 @@ public GroupReduceOperator(Grouping input, TypeInformation resultType, this.defaultName = defaultName; checkCombinability(); - - extractSemanticAnnotationsFromUdf(function.getClass()); } private void checkCombinability() { @@ -92,6 +90,12 @@ private void checkCombinability() { this.combinable = true; } } + + + @Override + protected GroupReduceFunction getFunction() { + return function; + } // -------------------------------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java index 88bd273787299..822759f1ebc5b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java @@ -21,6 +21,8 @@ import java.security.InvalidParameterException; import java.util.Arrays; +import com.google.common.base.Preconditions; + import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.JoinFunction; @@ -55,8 +57,6 @@ import org.apache.flink.api.java.tuple.*; //CHECKSTYLE.ON: AvoidStarImport -import com.google.common.base.Preconditions; - /** * A {@link DataSet} that is the result of a Join transformation. * @@ -200,12 +200,6 @@ public EquiJoin(DataSet input1, DataSet input2, this.function = function; this.joinLocationName = joinLocationName; - - if (!(function instanceof ProjectFlatJoinFunction)) { - extractSemanticAnnotationsFromUdf(function.getClass()); - } else { - generateProjectionProperties(((ProjectFlatJoinFunction) function)); - } } public EquiJoin(DataSet input1, DataSet input2, @@ -221,37 +215,21 @@ public EquiJoin(DataSet input1, DataSet input2, } this.function = generatedFunction; + } + + @Override + protected FlatJoinFunction getFunction() { + return function; + } - if (!(generatedFunction instanceof ProjectFlatJoinFunction)) { - extractSemanticAnnotationsFromUdf(function.getClass()); + @Override + protected DualInputSemanticProperties extractSemanticAnnotationsFromUdf(Class udfClass) { + if (function instanceof DefaultJoin.WrappingFlatJoinFunction) { + return super.extractSemanticAnnotationsFromUdf(((WrappingFunction) function).getWrappedFunction().getClass()); } else { - generateProjectionProperties(((ProjectFlatJoinFunction) generatedFunction)); + return super.extractSemanticAnnotationsFromUdf(function.getClass()); } } - - public void generateProjectionProperties(ProjectFlatJoinFunction pjf) { - DualInputSemanticProperties props = SemanticPropUtil.createProjectionPropertiesDual(pjf.getFields(), pjf.getIsFromFirst()); - setSemanticProperties(props); - } - - // TODO -// public EquiJoin leftOuter() { -// this.preserve1 = true; -// return this; -// } - - // TODO -// public EquiJoin rightOuter() { -// this.preserve2 = true; -// return this; -// } - - // TODO -// public EquiJoin fullOuter() { -// this.preserve1 = true; -// this.preserve2 = true; -// return this; -// } @Override protected JoinOperatorBase translateToDataFlow(Operator input1, Operator input2) { @@ -647,6 +625,11 @@ protected ProjectJoin(DataSet input1, DataSet input2, Keys keys1, Ke this.joinProj = joinProj; } + + @Override + protected ProjectFlatJoinFunction getFunction() { + return (ProjectFlatJoinFunction) super.getFunction(); + } /** * Continues a ProjectJoin transformation and adds fields of the first join input to the projection.
@@ -706,8 +689,6 @@ public ProjectJoin projectSecond(int... secondF * Deprecated method only kept for compatibility. * * @param types - * - * @return */ @SuppressWarnings({ "unchecked", "hiding" }) @Deprecated @@ -735,6 +716,13 @@ public JoinOperator withConstantSetFirst(String... constantSetFirst public JoinOperator withConstantSetSecond(String... constantSetSecond) { throw new InvalidProgramException("The semantic properties (constant fields and forwarded fields) are automatically calculated."); } + + @Override + protected DualInputSemanticProperties extractSemanticAnnotationsFromUdf(Class udfClass) { + // we do not extract the annotation, we construct the properties from the projection# + return SemanticPropUtil.createProjectionPropertiesDual(getFunction().getFields(), getFunction().getIsFromFirst()); + } + } // @SuppressWarnings("unused") @@ -1282,11 +1270,12 @@ protected JoinProjection projectSecond(int... secondFieldIndexes) { // GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator. /** - * Chooses a projectTupleX according to the length of {@link JoinProjection#fieldIndexes} + * Chooses a projectTupleX according to the length of + * {@link org.apache.flink.api.java.operators.JoinOperator.JoinProjection#fieldIndexes} * * @return The projected DataSet. * - * @see ProjectJoin + * @see org.apache.flink.api.java.operators.JoinOperator.ProjectJoin */ @SuppressWarnings("unchecked") public ProjectJoin projectTupleX() { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java index c2a2a8ee27f8e..5fca38a223e52 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java @@ -76,6 +76,9 @@ public SelectorFunctionKeys(KeySelector keyExtractor, TypeInformation i if (keyExtractor == null) { throw new NullPointerException("Key extractor must not be null."); } + if (keyType == null) { + throw new NullPointerException("Key type must not be null."); + } this.keyExtractor = keyExtractor; this.keyType = keyType; diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java index b4433dc7d70bf..9e96c642a171c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java @@ -45,7 +45,11 @@ public MapOperator(DataSet input, TypeInformation resultType, MapFuncti this.defaultName = defaultName; this.function = function; - extractSemanticAnnotationsFromUdf(function.getClass()); + } + + @Override + protected MapFunction getFunction() { + return function; } @Override diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java index 067f7afff596b..a6c69c10c3987 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java @@ -45,7 +45,11 @@ public MapPartitionOperator(DataSet input, TypeInformation resultType, this.function = function; this.defaultName = defaultName; - extractSemanticAnnotationsFromUdf(function.getClass()); + } + + @Override + protected MapPartitionFunction getFunction() { + return function; } @Override diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java index 22d4d447e3c20..edb5a68a224c6 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java @@ -40,7 +40,7 @@ * * @param The type of the data being partitioned. */ -public class PartitionOperator extends SingleInputUdfOperator> { +public class PartitionOperator extends SingleInputOperator> { private final Keys pKeys; private final PartitionMethod pMethod; diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java index daf129a6f89e4..0b2aa95c1aa81 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java @@ -101,10 +101,6 @@ public ProjectOperator project(int... fieldIndexes) } /** * Deprecated method only kept for compatibility. - * - * @param types - * - * @return */ @SuppressWarnings({ "unchecked", "hiding" }) @Deprecated @@ -180,11 +176,12 @@ private void acceptAdditionalIndexes(int... additionalIndexes) { // GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator. /** - * Chooses a projectTupleX according to the length of {@link Projection#fieldIndexes} + * Chooses a projectTupleX according to the length of + * {@link org.apache.flink.api.java.operators.ProjectOperator.Projection#fieldIndexes} * * @return The projected DataSet. * - * @see Projection + * @see org.apache.flink.api.java.operators.ProjectOperator.Projection */ @SuppressWarnings("unchecked") public ProjectOperator projectTupleX() { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java index 02b0ede375062..d1ad4c397ee10 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java @@ -61,8 +61,6 @@ public ReduceOperator(DataSet input, ReduceFunction function, String def this.function = function; this.grouper = null; this.defaultName = defaultName; - - extractSemanticAnnotationsFromUdf(function.getClass()); } @@ -72,8 +70,11 @@ public ReduceOperator(Grouping input, ReduceFunction function, String de this.function = function; this.grouper = input; this.defaultName = defaultName; - - extractSemanticAnnotationsFromUdf(function.getClass()); + } + + @Override + protected ReduceFunction getFunction() { + return function; } @Override diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java index 0823c153f5637..0d0cb15504ce5 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java @@ -24,12 +24,16 @@ import java.util.Map; import java.util.Set; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.functions.FunctionAnnotation; import org.apache.flink.api.java.functions.SemanticPropUtil; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; -import org.apache.flink.api.java.DataSet; /** * The SingleInputUdfOperator is the base class of all unary operators that execute @@ -65,11 +69,7 @@ protected SingleInputUdfOperator(DataSet input, TypeInformation resultT } - protected void extractSemanticAnnotationsFromUdf(Class udfClass) { - Set annotations = FunctionAnnotation.readSingleConstantAnnotations(udfClass); - SingleInputSemanticProperties sp = SemanticPropUtil.getSemanticPropsSingle(annotations, getInputType(), getResultType()); - setSemanticProperties(sp); - } + protected abstract Function getFunction(); // -------------------------------------------------------------------------------------------- // Fluent API methods @@ -130,6 +130,120 @@ public O withConstantSet(String... constantSet) { O returnType = (O) this; return returnType; } + + /** + * Adds a type information hint about the return type of this operator. + * + *

+ * Type hints are important in cases where the Java compiler + * throws away generic type information necessary for efficient execution. + * + *

+ * This method takes a type information string that will be parsed. A type information string can contain the following + * types: + * + *

    + *
  • Basic types such as Integer, String, etc. + *
  • Basic type arrays such as Integer[], + * String[], etc. + *
  • Tuple types such as Tuple1<TYPE0>, + * Tuple2<TYPE0, TYPE1>, etc.
  • + *
  • Pojo types such as org.my.MyPojo<myFieldName=TYPE0,myFieldName2=TYPE1>, etc.
  • + *
  • Generic types such as java.lang.Class, etc. + *
  • Custom type arrays such as org.my.CustomClass[], + * org.my.CustomClass$StaticInnerClass[], etc. + *
  • Value types such as DoubleValue, + * StringValue, IntegerValue, etc.
  • + *
  • Tuple array types such as Tuple2<TYPE0,TYPE1>[], etc.
  • + *
  • Writable types such as Writable<org.my.CustomWritable>
  • + *
  • Enum types such as Enum<org.my.CustomEnum>
  • + *
+ * + * Example: + * "Tuple2<String,Tuple2<Integer,org.my.MyJob$Pojo<word=String>>>" + * + * @param typeInfoString + * type information string to be parsed + * @return This operator with a given return type hint. + */ + public O returns(String typeInfoString) { + if (typeInfoString == null) { + throw new IllegalArgumentException("Type information string must not be null."); + } + return returns(TypeInfoParser.parse(typeInfoString)); + } + + /** + * Adds a type information hint about the return type of this operator. + * + *

+ * Type hints are important in cases where the Java compiler + * throws away generic type information necessary for efficient execution. + * + *

+ * This method takes an instance of {@link org.apache.flink.api.common.typeinfo.TypeInformation} such as: + * + *

    + *
  • {@link org.apache.flink.api.common.typeinfo.BasicTypeInfo}
  • + *
  • {@link org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo}
  • + *
  • {@link org.apache.flink.api.java.typeutils.TupleTypeInfo}
  • + *
  • {@link org.apache.flink.api.java.typeutils.PojoTypeInfo}
  • + *
  • {@link org.apache.flink.api.java.typeutils.WritableTypeInfo}
  • + *
  • {@link org.apache.flink.api.java.typeutils.ValueTypeInfo}
  • + *
  • etc.
  • + *
+ * + * @param typeInfo + * type information as a return type hint + * @return This operator with a given return type hint. + */ + public O returns(TypeInformation typeInfo) { + if (typeInfo == null) { + throw new IllegalArgumentException("Type information must not be null."); + } + fillInType(typeInfo); + @SuppressWarnings("unchecked") + O returnType = (O) this; + return returnType; + } + + /** + * Adds a type information hint about the return type of this operator. + * + *

+ * Type hints are important in cases where the Java compiler + * throws away generic type information necessary for efficient execution. + * + *

+ * This method takes a class that will be analyzed by Flink's type extraction capabilities. + * + *

+ * Examples for classes are: + *

    + *
  • Basic types such as Integer.class, String.class, etc.
  • + *
  • POJOs such as MyPojo.class
  • + *
  • Classes that extend tuples. Classes like Tuple1.class,Tuple2.class, etc. are not sufficient.
  • + *
  • Arrays such as String[].class, etc.
  • + *
+ * + * @param typeClass + * class as a return type hint + * @return This operator with a given return type hint. + */ + @SuppressWarnings("unchecked") + public O returns(Class typeClass) { + if (typeClass == null) { + throw new IllegalArgumentException("Type class must not be null."); + } + + try { + TypeInformation ti = (TypeInformation) TypeExtractor.createTypeInfo(typeClass); + return returns(ti); + } + catch (InvalidTypesException e) { + throw new InvalidTypesException("The given class is not suited for providing necessary type information.", e); + } + } // -------------------------------------------------------------------------------------------- // Accessors @@ -149,6 +263,11 @@ public Configuration getParameters() { @Override public SingleInputSemanticProperties getSemanticProperties() { + if (udfSemantics == null) { + SingleInputSemanticProperties props = extractSemanticAnnotations(getFunction().getClass()); + udfSemantics = props != null ? props : new SingleInputSemanticProperties(); + } + return this.udfSemantics; } @@ -163,4 +282,9 @@ public SingleInputSemanticProperties getSemanticProperties() { public void setSemanticProperties(SingleInputSemanticProperties properties) { this.udfSemantics = properties; } + + protected SingleInputSemanticProperties extractSemanticAnnotations(Class udfClass) { + Set annotations = FunctionAnnotation.readSingleConstantAnnotations(udfClass); + return SemanticPropUtil.getSemanticPropsSingle(annotations, getInputType(), getResultType()); + } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java index 7ca0840ba30bd..2de928268d907 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java @@ -24,10 +24,14 @@ import java.util.Map; import java.util.Set; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.operators.DualInputSemanticProperties; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.FunctionAnnotation; import org.apache.flink.api.java.functions.SemanticPropUtil; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; import org.apache.flink.api.java.DataSet; @@ -67,14 +71,7 @@ protected TwoInputUdfOperator(DataSet input1, DataSet input2, TypeInfo super(input1, input2, resultType); } - protected void extractSemanticAnnotationsFromUdf(Class udfClass) { - Set annotations = FunctionAnnotation.readDualConstantAnnotations(udfClass); - - DualInputSemanticProperties dsp = SemanticPropUtil.getSemanticPropsDual(annotations, - getInput1Type(), getInput2Type(), getResultType()); - - setSemanticProperties(dsp); - } + protected abstract Function getFunction(); // -------------------------------------------------------------------------------------------- // Fluent API methods @@ -172,6 +169,121 @@ public O withConstantSetSecond(String... constantSetSecond) { O returnType = (O) this; return returnType; } + + /** + * Adds a type information hint about the return type of this operator. + * + *

+ * Type hints are important in cases where the Java compiler + * throws away generic type information necessary for efficient execution. + * + *

+ * This method takes a type information string that will be parsed. A type information string can contain the following + * types: + * + *

    + *
  • Basic types such as Integer, String, etc. + *
  • Basic type arrays such as Integer[], + * String[], etc. + *
  • Tuple types such as Tuple1<TYPE0>, + * Tuple2<TYPE0, TYPE1>, etc.
  • + *
  • Pojo types such as org.my.MyPojo<myFieldName=TYPE0,myFieldName2=TYPE1>, etc.
  • + *
  • Generic types such as java.lang.Class, etc. + *
  • Custom type arrays such as org.my.CustomClass[], + * org.my.CustomClass$StaticInnerClass[], etc. + *
  • Value types such as DoubleValue, + * StringValue, IntegerValue, etc.
  • + *
  • Tuple array types such as Tuple2<TYPE0,TYPE1>[], etc.
  • + *
  • Writable types such as Writable<org.my.CustomWritable>
  • + *
  • Enum types such as Enum<org.my.CustomEnum>
  • + *
+ * + * Example: + * "Tuple2<String,Tuple2<Integer,org.my.MyJob$Pojo<word=String>>>" + * + * @param typeInfoString + * type information string to be parsed + * @return This operator with a given return type hint. + */ + public O returns(String typeInfoString) { + if (typeInfoString == null) { + throw new IllegalArgumentException("Type information string must not be null."); + } + return returns(TypeInfoParser.parse(typeInfoString)); + } + + /** + * Adds a type information hint about the return type of this operator. + * + *

+ * Type hints are important in cases where the Java compiler + * throws away generic type information necessary for efficient execution. + * + *

+ * This method takes an instance of {@link org.apache.flink.api.common.typeinfo.TypeInformation} such as: + * + *

    + *
  • {@link org.apache.flink.api.common.typeinfo.BasicTypeInfo}
  • + *
  • {@link org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo}
  • + *
  • {@link org.apache.flink.api.java.typeutils.TupleTypeInfo}
  • + *
  • {@link org.apache.flink.api.java.typeutils.PojoTypeInfo}
  • + *
  • {@link org.apache.flink.api.java.typeutils.WritableTypeInfo}
  • + *
  • {@link org.apache.flink.api.java.typeutils.ValueTypeInfo}
  • + *
  • etc.
  • + *
+ * + * @param typeInfo + * type information as a return type hint + * @return This operator with a given return type hint. + */ + public O returns(TypeInformation typeInfo) { + if (typeInfo == null) { + throw new IllegalArgumentException("Type information must not be null."); + } + fillInType(typeInfo); + + @SuppressWarnings("unchecked") + O returnType = (O) this; + return returnType; + } + + /** + * Adds a type information hint about the return type of this operator. + * + *

+ * Type hints are important in cases where the Java compiler + * throws away generic type information necessary for efficient execution. + * + *

+ * This method takes a class that will be analyzed by Flink's type extraction capabilities. + * + *

+ * Examples for classes are: + *

    + *
  • Basic types such as Integer.class, String.class, etc.
  • + *
  • POJOs such as MyPojo.class
  • + *
  • Classes that extend tuples. Classes like Tuple1.class,Tuple2.class, etc. are not sufficient.
  • + *
  • Arrays such as String[].class, etc.
  • + *
+ * + * @param typeClass + * class as a return type hint + * @return This operator with a given return type hint. + */ + @SuppressWarnings("unchecked") + public O returns(Class typeClass) { + if (typeClass == null) { + throw new IllegalArgumentException("Type class must not be null."); + } + + try { + TypeInformation ti = (TypeInformation) TypeExtractor.createTypeInfo(typeClass); + return returns(ti); + } + catch (InvalidTypesException e) { + throw new InvalidTypesException("The given class is not suited for providing necessary type information.", e); + } + } // -------------------------------------------------------------------------------------------- // Accessors @@ -191,6 +303,11 @@ public Configuration getParameters() { @Override public DualInputSemanticProperties getSemanticProperties() { + if (udfSemantics == null) { + DualInputSemanticProperties props = extractSemanticAnnotationsFromUdf(getFunction().getClass()); + udfSemantics = props != null ? props : new DualInputSemanticProperties(); + } + return this.udfSemantics; } @@ -205,4 +322,10 @@ public DualInputSemanticProperties getSemanticProperties() { public void setSemanticProperties(DualInputSemanticProperties properties) { this.udfSemantics = properties; } + + + protected DualInputSemanticProperties extractSemanticAnnotationsFromUdf(Class udfClass) { + Set annotations = FunctionAnnotation.readDualConstantAnnotations(udfClass); + return SemanticPropUtil.getSemanticPropsDual(annotations, getInput1Type(), getInput2Type(), getResultType()); + } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java index c6f72f2cf3d91..001c46b9e9eb7 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java @@ -16,9 +16,9 @@ * limitations under the License. */ - package org.apache.flink.api.java.operators; +import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.Union; import org.apache.flink.api.java.DataSet; @@ -31,6 +31,7 @@ public class UnionOperator extends TwoInputOperator> { private final String unionLocationName; + /** * Create an operator that produces the union of the two given data sets. * @@ -40,6 +41,11 @@ public class UnionOperator extends TwoInputOperator public UnionOperator(DataSet input1, DataSet input2, String unionLocationName) { super(input1, input2, input1.getType()); + if (!input1.getType().equals(input2.getType())) { + throw new InvalidProgramException("Cannot union inputs of different types. Input1=" + + input1.getType() + ", input2=" + input2.getType()); + } + this.unionLocationName = unionLocationName; } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java new file mode 100644 index 0000000000000..10ab02f29d1c0 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +/** + * A special type information signifying that the type extraction failed. It contains + * additional error information. + */ +public class MissingTypeInfo extends TypeInformation { + + private String functionName; + private InvalidTypesException typeException; + + + public MissingTypeInfo(String functionName) { + this(functionName, new InvalidTypesException("An unknown error occured.")); + } + + public MissingTypeInfo(String functionName, InvalidTypesException typeException) { + this.functionName = functionName; + this.typeException = typeException; + } + + // -------------------------------------------------------------------------------------------- + + public String getFunctionName() { + return functionName; + } + + public InvalidTypesException getTypeException() { + return typeException; + } + + // -------------------------------------------------------------------------------------------- + + @Override + public boolean isBasicType() { + throw new UnsupportedOperationException("The missing type information cannot be used as a type information."); + } + + @Override + public boolean isTupleType() { + throw new UnsupportedOperationException("The missing type information cannot be used as a type information."); + } + + @Override + public int getArity() { + throw new UnsupportedOperationException("The missing type information cannot be used as a type information."); + } + + @Override + public Class getTypeClass() { + throw new UnsupportedOperationException("The missing type information cannot be used as a type information."); + } + + @Override + public boolean isKeyType() { + throw new UnsupportedOperationException("The missing type information cannot be used as a type information."); + } + + @Override + public TypeSerializer createSerializer() { + throw new UnsupportedOperationException("The missing type information cannot be used as a type information."); + } + + @Override + public int getTotalFields() { + throw new UnsupportedOperationException("The missing type information cannot be used as a type information."); + } +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java index 4d8a81e949355..22f794293b551 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java @@ -75,7 +75,7 @@ public Class getTypeClass() { /** * Recursively add all fields in this tuple type. We need this in particular to get all * the types. - * @param keyId + * @param startKeyId * @param keyFields */ public void addAllFields(int startKeyId, List keyFields) { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index b528d00d95a2e..edff09c3332b5 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -59,7 +59,12 @@ import com.google.common.base.Preconditions; +/** + * A utility for reflection analysis on classes, to determine the return type of implementations of transformation + * functions. + */ public class TypeExtractor { + private static final Logger LOG = LoggerFactory.getLogger(TypeExtractor.class); // We need this to detect recursive types and not get caught @@ -75,108 +80,211 @@ private TypeExtractor() { // -------------------------------------------------------------------------------------------- public static TypeInformation getMapReturnTypes(MapFunction mapInterface, TypeInformation inType) { - return getUnaryOperatorReturnType((Function) mapInterface, MapFunction.class, false, false, inType); + return getMapReturnTypes(mapInterface, inType, null, false); } + public static TypeInformation getMapReturnTypes(MapFunction mapInterface, TypeInformation inType, + String functionName, boolean allowMissing) + { + return getUnaryOperatorReturnType((Function) mapInterface, MapFunction.class, false, false, inType, functionName, allowMissing); + } + + public static TypeInformation getFlatMapReturnTypes(FlatMapFunction flatMapInterface, TypeInformation inType) { - return getUnaryOperatorReturnType((Function) flatMapInterface, FlatMapFunction.class, false, true, inType); + return getFlatMapReturnTypes(flatMapInterface, inType, null, false); } + public static TypeInformation getFlatMapReturnTypes(FlatMapFunction flatMapInterface, TypeInformation inType, + String functionName, boolean allowMissing) + { + return getUnaryOperatorReturnType((Function) flatMapInterface, FlatMapFunction.class, false, true, inType, functionName, allowMissing); + } + + public static TypeInformation getMapPartitionReturnTypes(MapPartitionFunction mapPartitionInterface, TypeInformation inType) { - return getUnaryOperatorReturnType((Function) mapPartitionInterface, MapPartitionFunction.class, true, true, inType); + return getMapPartitionReturnTypes(mapPartitionInterface, inType, null, false); } - public static TypeInformation getGroupReduceReturnTypes(GroupReduceFunction groupReduceInterface, - TypeInformation inType) { - return getUnaryOperatorReturnType((Function) groupReduceInterface, GroupReduceFunction.class, true, true, inType); + public static TypeInformation getMapPartitionReturnTypes(MapPartitionFunction mapPartitionInterface, TypeInformation inType, + String functionName, boolean allowMissing) + { + return getUnaryOperatorReturnType((Function) mapPartitionInterface, MapPartitionFunction.class, true, true, inType, functionName, allowMissing); } + + public static TypeInformation getGroupReduceReturnTypes(GroupReduceFunction groupReduceInterface, TypeInformation inType) { + return getGroupReduceReturnTypes(groupReduceInterface, inType, null, false); + } + + public static TypeInformation getGroupReduceReturnTypes(GroupReduceFunction groupReduceInterface, TypeInformation inType, + String functionName, boolean allowMissing) + { + return getUnaryOperatorReturnType((Function) groupReduceInterface, GroupReduceFunction.class, true, true, inType, functionName, allowMissing); + } + + public static TypeInformation getFlatJoinReturnTypes(FlatJoinFunction joinInterface, - TypeInformation in1Type, TypeInformation in2Type) { - return getBinaryOperatorReturnType((Function) joinInterface, FlatJoinFunction.class, false, true, in1Type, in2Type); + TypeInformation in1Type, TypeInformation in2Type) + { + return getFlatJoinReturnTypes(joinInterface, in1Type, in2Type, null, false); } + public static TypeInformation getFlatJoinReturnTypes(FlatJoinFunction joinInterface, + TypeInformation in1Type, TypeInformation in2Type, String functionName, boolean allowMissing) + { + return getBinaryOperatorReturnType((Function) joinInterface, FlatJoinFunction.class, false, true, + in1Type, in2Type, functionName, allowMissing); + } + + public static TypeInformation getJoinReturnTypes(JoinFunction joinInterface, - TypeInformation in1Type, TypeInformation in2Type) { - return getBinaryOperatorReturnType((Function) joinInterface, JoinFunction.class, false, false, in1Type, in2Type); + TypeInformation in1Type, TypeInformation in2Type) + { + return getJoinReturnTypes(joinInterface, in1Type, in2Type, null, false); + } + + public static TypeInformation getJoinReturnTypes(JoinFunction joinInterface, + TypeInformation in1Type, TypeInformation in2Type, String functionName, boolean allowMissing) + { + return getBinaryOperatorReturnType((Function) joinInterface, JoinFunction.class, false, false, + in1Type, in2Type, functionName, allowMissing); } + public static TypeInformation getCoGroupReturnTypes(CoGroupFunction coGroupInterface, - TypeInformation in1Type, TypeInformation in2Type) { - return getBinaryOperatorReturnType((Function) coGroupInterface, CoGroupFunction.class, true, true, in1Type, in2Type); + TypeInformation in1Type, TypeInformation in2Type) + { + return getCoGroupReturnTypes(coGroupInterface, in1Type, in2Type, null, false); } + public static TypeInformation getCoGroupReturnTypes(CoGroupFunction coGroupInterface, + TypeInformation in1Type, TypeInformation in2Type, String functionName, boolean allowMissing) + { + return getBinaryOperatorReturnType((Function) coGroupInterface, CoGroupFunction.class, true, true, + in1Type, in2Type, functionName, allowMissing); + } + + public static TypeInformation getCrossReturnTypes(CrossFunction crossInterface, - TypeInformation in1Type, TypeInformation in2Type) { - return getBinaryOperatorReturnType((Function) crossInterface, CrossFunction.class, false, false, in1Type, in2Type); + TypeInformation in1Type, TypeInformation in2Type) + { + return getCrossReturnTypes(crossInterface, in1Type, in2Type, null, false); + } + + public static TypeInformation getCrossReturnTypes(CrossFunction crossInterface, + TypeInformation in1Type, TypeInformation in2Type, String functionName, boolean allowMissing) + { + return getBinaryOperatorReturnType((Function) crossInterface, CrossFunction.class, false, false, + in1Type, in2Type, functionName, allowMissing); } + public static TypeInformation getKeySelectorTypes(KeySelector selectorInterface, TypeInformation inType) { - return getUnaryOperatorReturnType((Function) selectorInterface, KeySelector.class, false, false, inType); + return getKeySelectorTypes(selectorInterface, inType, null, false); + } + + public static TypeInformation getKeySelectorTypes(KeySelector selectorInterface, + TypeInformation inType, String functionName, boolean allowMissing) + { + return getUnaryOperatorReturnType((Function) selectorInterface, KeySelector.class, false, false, inType, functionName, allowMissing); } + public static TypeInformation getPartitionerTypes(Partitioner partitioner) { + return getPartitionerTypes(partitioner, null, false); + } + + public static TypeInformation getPartitionerTypes(Partitioner partitioner, String functionName, boolean allowMissing) { return new TypeExtractor().privateCreateTypeInfo(Partitioner.class, partitioner.getClass(), 0, null, null); } + @SuppressWarnings("unchecked") public static TypeInformation getInputFormatTypes(InputFormat inputFormatInterface) { - if(inputFormatInterface instanceof ResultTypeQueryable) { + if (inputFormatInterface instanceof ResultTypeQueryable) { return ((ResultTypeQueryable) inputFormatInterface).getProducedType(); } return new TypeExtractor().privateCreateTypeInfo(InputFormat.class, inputFormatInterface.getClass(), 0, null, null); } + // -------------------------------------------------------------------------------------------- + // Generic extraction methods + // -------------------------------------------------------------------------------------------- + @SuppressWarnings("unchecked") - private static TypeInformation getUnaryOperatorReturnType(Function function, Class baseClass, boolean hasIterable, boolean hasCollector, TypeInformation inType) { - final Method m = FunctionUtils.checkAndExtractLambdaMethod(function); - if (m != null) { - // check for lambda type erasure - validateLambdaGenericParameters(m); - - // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function - final int paramLen = m.getGenericParameterTypes().length - 1; - final Type input = (hasCollector)? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen]; - validateInputType((hasIterable)?removeGenericWrapper(input) : input, inType); - if(function instanceof ResultTypeQueryable) { - return ((ResultTypeQueryable) function).getProducedType(); + private static TypeInformation getUnaryOperatorReturnType(Function function, Class baseClass, + boolean hasIterable, boolean hasCollector, TypeInformation inType, + String functionName, boolean allowMissing) + { + try { + final Method m = FunctionUtils.checkAndExtractLambdaMethod(function); + if (m != null) { + // check for lambda type erasure + validateLambdaGenericParameters(m); + + // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function + final int paramLen = m.getGenericParameterTypes().length - 1; + final Type input = (hasCollector)? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen]; + validateInputType((hasIterable)?removeGenericWrapper(input) : input, inType); + if(function instanceof ResultTypeQueryable) { + return ((ResultTypeQueryable) function).getProducedType(); + } + return new TypeExtractor().privateCreateTypeInfo((hasCollector)? removeGenericWrapper(m.getGenericParameterTypes()[paramLen]) : m.getGenericReturnType(), inType, null); + } + else { + validateInputType(baseClass, function.getClass(), 0, inType); + if(function instanceof ResultTypeQueryable) { + return ((ResultTypeQueryable) function).getProducedType(); + } + return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), 1, inType, null); } - return new TypeExtractor().privateCreateTypeInfo((hasCollector)? removeGenericWrapper(m.getGenericParameterTypes()[paramLen]) : m.getGenericReturnType(), inType, null); } - else { - validateInputType(baseClass, function.getClass(), 0, inType); - if(function instanceof ResultTypeQueryable) { - return ((ResultTypeQueryable) function).getProducedType(); + catch (InvalidTypesException e) { + if (allowMissing) { + return (TypeInformation) new MissingTypeInfo(functionName != null ? functionName : function.toString(), e); + } else { + throw e; } - return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), 1, inType, null); } } @SuppressWarnings("unchecked") - private static TypeInformation getBinaryOperatorReturnType(Function function, Class baseClass, boolean hasIterables, boolean hasCollector, TypeInformation in1Type, TypeInformation in2Type) { - final Method m = FunctionUtils.checkAndExtractLambdaMethod(function); - if (m != null) { - // check for lambda type erasure - validateLambdaGenericParameters(m); - - // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function - final int paramLen = m.getGenericParameterTypes().length - 1; - final Type input1 = (hasCollector)? m.getGenericParameterTypes()[paramLen - 2] : m.getGenericParameterTypes()[paramLen - 1]; - final Type input2 = (hasCollector)? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen]; - validateInputType((hasIterables)? removeGenericWrapper(input1) : input1, in1Type); - validateInputType((hasIterables)? removeGenericWrapper(input2) : input2, in2Type); - if(function instanceof ResultTypeQueryable) { - return ((ResultTypeQueryable) function).getProducedType(); + private static TypeInformation getBinaryOperatorReturnType(Function function, Class baseClass, + boolean hasIterables, boolean hasCollector, TypeInformation in1Type, TypeInformation in2Type, + String functionName, boolean allowMissing) + { + try { + final Method m = FunctionUtils.checkAndExtractLambdaMethod(function); + if (m != null) { + // check for lambda type erasure + validateLambdaGenericParameters(m); + + // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function + final int paramLen = m.getGenericParameterTypes().length - 1; + final Type input1 = (hasCollector)? m.getGenericParameterTypes()[paramLen - 2] : m.getGenericParameterTypes()[paramLen - 1]; + final Type input2 = (hasCollector)? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen]; + validateInputType((hasIterables)? removeGenericWrapper(input1) : input1, in1Type); + validateInputType((hasIterables)? removeGenericWrapper(input2) : input2, in2Type); + if(function instanceof ResultTypeQueryable) { + return ((ResultTypeQueryable) function).getProducedType(); + } + return new TypeExtractor().privateCreateTypeInfo((hasCollector)? removeGenericWrapper(m.getGenericParameterTypes()[paramLen]) : m.getGenericReturnType(), in1Type, in2Type); + } + else { + validateInputType(baseClass, function.getClass(), 0, in1Type); + validateInputType(baseClass, function.getClass(), 1, in2Type); + if(function instanceof ResultTypeQueryable) { + return ((ResultTypeQueryable) function).getProducedType(); + } + return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), 2, in1Type, in2Type); } - return new TypeExtractor().privateCreateTypeInfo((hasCollector)? removeGenericWrapper(m.getGenericParameterTypes()[paramLen]) : m.getGenericReturnType(), in1Type, in2Type); } - else { - validateInputType(baseClass, function.getClass(), 0, in1Type); - validateInputType(baseClass, function.getClass(), 1, in2Type); - if(function instanceof ResultTypeQueryable) { - return ((ResultTypeQueryable) function).getProducedType(); + catch (InvalidTypesException e) { + if (allowMissing) { + return (TypeInformation) new MissingTypeInfo(functionName != null ? functionName : function.toString(), e); + } else { + throw e; } - return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), 2, in1Type, in2Type); } } @@ -185,12 +293,20 @@ private static TypeInformation getBinaryOperatorReturnType( // -------------------------------------------------------------------------------------------- public static TypeInformation createTypeInfo(Type t) { - return new TypeExtractor().privateCreateTypeInfo(t); + TypeInformation ti = new TypeExtractor().privateCreateTypeInfo(t); + if (ti == null) { + throw new InvalidTypesException("Could not extract type information."); + } + return ti; } public static TypeInformation createTypeInfo(Class baseClass, Class clazz, int returnParamPos, TypeInformation in1Type, TypeInformation in2Type) { - return new TypeExtractor().privateCreateTypeInfo(baseClass, clazz, returnParamPos, in1Type, in2Type); + TypeInformation ti = new TypeExtractor().privateCreateTypeInfo(baseClass, clazz, returnParamPos, in1Type, in2Type); + if (ti == null) { + throw new InvalidTypesException("Could not extract type information."); + } + return ti; } // ----------------------------------- private methods ---------------------------------------- @@ -550,7 +666,6 @@ private static void validateInputType(Class baseClass, Class clazz, int in @SuppressWarnings("unchecked") private static void validateInfo(ArrayList typeHierarchy, Type type, TypeInformation typeInfo) { - if (type == null) { throw new InvalidTypesException("Unknown Error. Type is null."); } @@ -784,29 +899,28 @@ private static void validateLambdaGenericParameter(Type t) { } private static String encodePrimitiveClass(Class primitiveClass) { - final String name = primitiveClass.getName(); - if (name.equals("boolean")) { + if (primitiveClass == boolean.class) { return "Z"; } - else if (name.equals("byte")) { + else if (primitiveClass == byte.class) { return "B"; } - else if (name.equals("char")) { + else if (primitiveClass == char.class) { return "C"; } - else if (name.equals("double")) { + else if (primitiveClass == double.class) { return "D"; } - else if (name.equals("float")) { + else if (primitiveClass == float.class) { return "F"; } - else if (name.equals("int")) { + else if (primitiveClass == int.class) { return "I"; } - else if (name.equals("long")) { + else if (primitiveClass == long.class) { return "J"; } - else if (name.equals("short")) { + else if (primitiveClass == short.class) { return "S"; } throw new InvalidTypesException(); @@ -972,7 +1086,6 @@ private TypeInformation privateGetForClass(Class clazz, ArrayList clazz, ArrayList typeHierarchy) { if(Modifier.isPublic(f.getModifiers())) { @@ -1028,6 +1141,7 @@ private boolean isValidPojoField(Field f, Class clazz, ArrayList typeHi } } + @SuppressWarnings("unchecked") private TypeInformation analyzePojo(Class clazz, ArrayList typeHierarchy, ParameterizedType clazzTypeHint) { // try to create Type hierarchy, if the incoming only contains the most bottom one or none. if(typeHierarchy.size() <= 1) { @@ -1054,7 +1168,7 @@ private TypeInformation analyzePojo(Class clazz, ArrayList typeH if(isClassType(fieldType)) { genericClass = typeToClass(fieldType); } - pojoFields.add(new PojoField(field, new GenericTypeInfo( genericClass ))); + pojoFields.add(new PojoField(field, new GenericTypeInfo( (Class) genericClass ))); } } @@ -1117,7 +1231,6 @@ private static boolean hasFieldWithSameName(String name, List fields) { } return false; } - // recursively determine all declared methods diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java index d088d16e2ef34..e9d5dac52a757 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java @@ -19,6 +19,8 @@ package org.apache.flink.api.java.typeutils; +import java.lang.reflect.Field; +import java.util.ArrayList; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -36,6 +38,7 @@ public class TypeInfoParser { private static final Pattern tuplePattern = Pattern.compile("^((" + TUPLE_PACKAGE.replaceAll("\\.", "\\\\.") + "\\.)?Tuple[0-9]+)<"); private static final Pattern writablePattern = Pattern.compile("^((" + WRITABLE_PACKAGE.replaceAll("\\.", "\\\\.") + "\\.)?Writable)<([^\\s,>]*)(,|>|$)"); + private static final Pattern enumPattern = Pattern.compile("^((java\\.lang\\.)?Enum)<([^\\s,>]*)(,|>|$)"); private static final Pattern basicTypePattern = Pattern .compile("^((java\\.lang\\.)?(String|Integer|Byte|Short|Character|Double|Float|Long|Boolean))(,|>|$)"); private static final Pattern basicType2Pattern = Pattern.compile("^(int|byte|short|char|double|float|long|boolean)(,|>|$)"); @@ -44,7 +47,8 @@ public class TypeInfoParser { private static final Pattern basicArrayTypePattern = Pattern .compile("^((java\\.lang\\.)?(String|Integer|Byte|Short|Character|Double|Float|Long|Boolean))\\[\\](,|>|$)"); private static final Pattern basicArrayType2Pattern = Pattern.compile("^(int|byte|short|char|double|float|long|boolean)\\[\\](,|>|$)"); - private static final Pattern customObjectPattern = Pattern.compile("^([^\\s,>]+)(,|>|$)"); + private static final Pattern pojoGenericObjectPattern = Pattern.compile("^([^\\s,<>]+)(<)?"); + private static final Pattern fieldPattern = Pattern.compile("^([^\\s,<>]+)="); /** * Generates an instance of TypeInformation by parsing a type @@ -57,18 +61,19 @@ public class TypeInfoParser { * String[], etc. *
  • Tuple types such as Tuple1<TYPE0>, * Tuple2<TYPE0, TYPE1>, etc.
  • - *
  • Custom types such as org.my.CustomClass, - * org.my.CustomClass$StaticInnerClass, etc. + *
  • Pojo types such as org.my.MyPojo<myFieldName=TYPE0,myFieldName2=TYPE1>, etc.
  • + *
  • Generic types such as java.lang.Class, etc. *
  • Custom type arrays such as org.my.CustomClass[], * org.my.CustomClass$StaticInnerClass[], etc. *
  • Value types such as DoubleValue, * StringValue, IntegerValue, etc.
  • - *
  • Tuple array types such as Tuple2[], etc.
  • + *
  • Tuple array types such as Tuple2<TYPE0,TYPE1>[], etc.
  • *
  • Writable types such as Writable<org.my.CustomWritable>
  • + *
  • Enum types such as Enum<org.my.CustomEnum>
  • * * * Example: - * "Tuple2<String,Tuple2<Integer,org.my.MyClass>>" + * "Tuple2<String,Tuple2<Integer,org.my.MyJob$Pojo<word=String>>>" * * @param infoString * type information string to be parsed @@ -97,6 +102,7 @@ private static TypeInformation parse(StringBuilder sb) throws ClassNotFoundEx final Matcher tupleMatcher = tuplePattern.matcher(infoString); final Matcher writableMatcher = writablePattern.matcher(infoString); + final Matcher enumMatcher = enumPattern.matcher(infoString); final Matcher basicTypeMatcher = basicTypePattern.matcher(infoString); final Matcher basicType2Matcher = basicType2Pattern.matcher(infoString); @@ -106,7 +112,7 @@ private static TypeInformation parse(StringBuilder sb) throws ClassNotFoundEx final Matcher basicArrayTypeMatcher = basicArrayTypePattern.matcher(infoString); final Matcher basicArrayType2Matcher = basicArrayType2Pattern.matcher(infoString); - final Matcher customObjectMatcher = customObjectPattern.matcher(infoString); + final Matcher pojoGenericMatcher = pojoGenericObjectPattern.matcher(infoString); if (infoString.length() == 0) { return null; @@ -163,15 +169,17 @@ private static TypeInformation parse(StringBuilder sb) throws ClassNotFoundEx else if (writableMatcher.find()) { String className = writableMatcher.group(1); String fullyQualifiedName = writableMatcher.group(3); - sb.delete(0, className.length() + 1 + fullyQualifiedName.length()); - - try { - Class clazz = Class.forName(fullyQualifiedName); - returnType = WritableTypeInfo.getWritableTypeInfo((Class) clazz); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException("Class '" + fullyQualifiedName - + "' could not be found for use as writable type. Please note that inner classes must be declared static."); - } + sb.delete(0, className.length() + 1 + fullyQualifiedName.length() + 1); + Class clazz = loadClass(fullyQualifiedName); + returnType = WritableTypeInfo.getWritableTypeInfo((Class) clazz); + } + // enum types + else if (enumMatcher.find()) { + String className = enumMatcher.group(1); + String fullyQualifiedName = enumMatcher.group(3); + sb.delete(0, className.length() + 1 + fullyQualifiedName.length() + 1); + Class clazz = loadClass(fullyQualifiedName); + returnType = new EnumTypeInfo(clazz); } // basic types of classes else if (basicTypeMatcher.find()) { @@ -225,7 +233,7 @@ else if (valueTypeMatcher.find()) { } returnType = ValueTypeInfo.getValueTypeInfo((Class) clazz); } - // array of classes + // array of basic classes else if (basicArrayTypeMatcher.find()) { String className = basicArrayTypeMatcher.group(1); sb.delete(0, className.length() + 2); @@ -263,32 +271,43 @@ else if (basicArrayType2Matcher.find()) { } returnType = PrimitiveArrayTypeInfo.getInfoFor(clazz); } - // custom objects - else if (customObjectMatcher.find()) { - String fullyQualifiedName = customObjectMatcher.group(1); + // pojo objects or generic types + else if (pojoGenericMatcher.find()) { + String fullyQualifiedName = pojoGenericMatcher.group(1); sb.delete(0, fullyQualifiedName.length()); - if (fullyQualifiedName.contains("<")) { - throw new IllegalArgumentException("Parameterized custom classes are not supported by parser."); - } + boolean isPojo = pojoGenericMatcher.group(2) != null; + + if (isPojo) { + sb.deleteCharAt(0); + Class clazz = loadClass(fullyQualifiedName); + + ArrayList fields = new ArrayList(); + while (sb.charAt(0) != '>') { + final Matcher fieldMatcher = fieldPattern.matcher(sb); + if (!fieldMatcher.find()) { + throw new IllegalArgumentException("Field name missing."); + } + String fieldName = fieldMatcher.group(1); + sb.delete(0, fieldName.length() + 1); - // custom object array - if (fullyQualifiedName.endsWith("[]")) { - fullyQualifiedName = fullyQualifiedName.substring(0, fullyQualifiedName.length() - 2); - try { - Class clazz = Class.forName("[L" + fullyQualifiedName + ";"); - returnType = ObjectArrayTypeInfo.getInfoFor(clazz); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException("Class '" + fullyQualifiedName - + "' could not be found for use as object array. Please note that inner classes must be declared static."); + Field field = null; + try { + field = clazz.getDeclaredField(fieldName); + } catch (Exception e) { + throw new IllegalArgumentException("Field '" + fieldName + "'could not be accessed."); + } + fields.add(new PojoField(field, parse(sb))); } - } else { - try { - Class clazz = Class.forName(fullyQualifiedName); - returnType = new GenericTypeInfo(clazz); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException("Class '" + fullyQualifiedName - + "' could not be found for use as custom object. Please note that inner classes must be declared static."); + returnType = new PojoTypeInfo(clazz, fields); + } + else { + // custom object array + if (fullyQualifiedName.endsWith("[]")) { + fullyQualifiedName = fullyQualifiedName.substring(0, fullyQualifiedName.length() - 2); + returnType = ObjectArrayTypeInfo.getInfoFor(loadClass("[L" + fullyQualifiedName + ";")); + } else { + returnType = new GenericTypeInfo(loadClass(fullyQualifiedName)); } } } @@ -304,4 +323,13 @@ else if (customObjectMatcher.find()) { } } + private static Class loadClass(String fullyQualifiedName) { + try { + return Class.forName(fullyQualifiedName); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Class '" + fullyQualifiedName + + "' could not be found. Please note that inner classes must be declared static."); + } + } + } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java similarity index 99% rename from flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java rename to flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java index d06731ebc3f6b..6eb536d813c08 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java @@ -46,7 +46,7 @@ import static org.junit.Assert.fail; @SuppressWarnings("serial") -public class DistrinctTranslationTest { +public class DistinctTranslationTest { @Test public void testCombinable() { diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java index 23a4d5fcbe168..9834a25a3fa1f 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java @@ -44,7 +44,7 @@ import org.apache.flink.util.Collector; import org.junit.Test; -@SuppressWarnings("serial") +@SuppressWarnings({ "serial", "deprecation" }) public class CoGroupWrappingFunctionTest { @SuppressWarnings("unchecked") diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java index 7b28c3fd288c8..9e262fcfc7a6c 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java @@ -43,7 +43,7 @@ import org.apache.flink.util.Collector; import org.junit.Test; -@SuppressWarnings("serial") +@SuppressWarnings({ "serial", "deprecation" }) public class ReduceWrappingFunctionTest { @SuppressWarnings("unchecked") diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java index 96ba16b46e294..39d6e10e76f2a 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java @@ -27,7 +27,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; -import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple3; @@ -53,6 +52,7 @@ public class PojoTypeExtractionTest { public static class HasDuplicateField extends WC { + @SuppressWarnings("unused") private int count; // duplicate } @@ -614,6 +614,6 @@ public VertexTyped() { @Test public void testGetterSetterWithVertex() { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet set = env.fromElements(new VertexTyped(0L, 3.0), new VertexTyped(1L, 1.0)); + env.fromElements(new VertexTyped(0L, 3.0), new VertexTyped(1L, 1.0)); } -} \ No newline at end of file +} diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java index f8fb13c923e4a..1364a2f7738bd 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java @@ -45,6 +45,7 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple9; import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.MissingTypeInfo; import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; @@ -626,7 +627,7 @@ public ChainedFour map(ChainedFour value) throws Exception { @SuppressWarnings({ "unchecked", "rawtypes" }) @Test - public void testMissingTupleGenericsException() { + public void testMissingTupleGenerics() { RichMapFunction function = new RichMapFunction() { private static final long serialVersionUID = 1L; @@ -636,11 +637,15 @@ public Tuple2 map(String value) throws Exception { } }; + TypeInformation ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String"), "name", true); + Assert.assertTrue(ti instanceof MissingTypeInfo); + try { TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String")); - Assert.fail("exception expected"); - } catch (InvalidTypesException e) { - // right + Assert.fail("Expected an exception"); + } + catch (InvalidTypesException e) { + // expected } } @@ -656,11 +661,15 @@ public Tuple map(String value) throws Exception { } }; + TypeInformation ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String"), "name", true); + Assert.assertTrue(ti instanceof MissingTypeInfo); + try { TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String")); - Assert.fail("exception expected"); - } catch (InvalidTypesException e) { - // right + Assert.fail("Expected an exception"); + } + catch (InvalidTypesException e) { + // expected } } @@ -795,11 +804,15 @@ public String map(Object value) throws Exception { } }; + TypeInformation ti = TypeExtractor.getMapReturnTypes(function, TypeInfoParser.parse("String"), "name", true); + Assert.assertTrue(ti instanceof MissingTypeInfo); + try { TypeExtractor.getMapReturnTypes(function, TypeInfoParser.parse("String")); - Assert.fail("exception expected"); - } catch (InvalidTypesException e) { - // right + Assert.fail("Expected an exception"); + } + catch (InvalidTypesException e) { + // expected } } @@ -893,14 +906,18 @@ public V map(T value) throws Exception { } @Test - public void testFunctionDependingOnInputException() { + public void testFunctionDependingOnUnknownInput() { IdentityMapper3 function = new IdentityMapper3(); + TypeInformation ti = TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.BOOLEAN_TYPE_INFO, "name", true); + Assert.assertTrue(ti instanceof MissingTypeInfo); + try { TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.BOOLEAN_TYPE_INFO); - Assert.fail("exception expected"); - } catch (InvalidTypesException e) { - // right + Assert.fail("Expected an exception"); + } + catch (InvalidTypesException e) { + // expected } } @@ -1072,12 +1089,8 @@ public Testable map(String value) throws Exception { } }; - try { - TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.STRING_TYPE_INFO); - Assert.fail("exception expected"); - } catch (InvalidTypesException e) { - // good - } + TypeInformation ti = TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.STRING_TYPE_INFO, null, true); + Assert.assertTrue(ti instanceof MissingTypeInfo); RichMapFunction function2 = new RichMapFunction() { private static final long serialVersionUID = 1L; @@ -1088,12 +1101,8 @@ public AbstractClass map(String value) throws Exception { } }; - try { - TypeExtractor.getMapReturnTypes(function2, BasicTypeInfo.STRING_TYPE_INFO); - Assert.fail("exception expected"); - } catch (InvalidTypesException e) { - // slick! - } + TypeInformation ti2 = TypeExtractor.getMapReturnTypes(function2, BasicTypeInfo.STRING_TYPE_INFO, null, true); + Assert.assertTrue(ti2 instanceof MissingTypeInfo); } @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -1108,11 +1117,15 @@ public Value map(StringValue value) throws Exception { } }; + TypeInformation ti =TypeExtractor.getMapReturnTypes(function, (TypeInformation)TypeInfoParser.parse("StringValue"), "name", true); + Assert.assertTrue(ti instanceof MissingTypeInfo); + try { TypeExtractor.getMapReturnTypes(function, (TypeInformation)TypeInfoParser.parse("StringValue")); - Assert.fail("exception expected"); - } catch (InvalidTypesException e) { - // bam! go type extractor! + Assert.fail("Expected an exception"); + } + catch (InvalidTypesException e) { + // expected } } @@ -1366,14 +1379,19 @@ public void flatMap(Tuple2 value, Collector> out) throws Exce @SuppressWarnings({ "unchecked", "rawtypes" }) @Test - public void testTypeErasureException() { + public void testTypeErasure() { + TypeInformation ti = TypeExtractor.getFlatMapReturnTypes(new DummyFlatMapFunction(), + (TypeInformation) TypeInfoParser.parse("Tuple2"), "name", true); + Assert.assertTrue(ti instanceof MissingTypeInfo); + try { TypeExtractor.getFlatMapReturnTypes(new DummyFlatMapFunction(), (TypeInformation) TypeInfoParser.parse("Tuple2")); - Assert.fail("exception expected"); + + Assert.fail("Expected an exception"); } catch (InvalidTypesException e) { - // right + // expected } } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java index 16d22a6386f00..9153d8df1d31c 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; @@ -149,12 +150,55 @@ public void testTuples() { } @Test - public void testCustomType() { + public void testGenericType() { TypeInformation ti = TypeInfoParser.parse("java.lang.Class"); Assert.assertTrue(ti instanceof GenericTypeInfo); Assert.assertEquals(Class.class, ((GenericTypeInfo) ti).getTypeClass()); } + public static class MyPojo { + public Integer basic; + public Tuple2 tuple; + public MyWritable hadoopCitizen; + public String[] array; + } + + @Test + public void testPojoType() { + TypeInformation ti = TypeInfoParser.parse( + "org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyPojo<" + + "basic=Integer," + + "tuple=Tuple2," + + "hadoopCitizen=Writable," + + "array=String[]" + + ">"); + Assert.assertTrue(ti instanceof PojoTypeInfo); + PojoTypeInfo pti = (PojoTypeInfo) ti; + Assert.assertEquals("array", pti.getPojoFieldAt(0).field.getName()); + Assert.assertTrue(pti.getPojoFieldAt(0).type instanceof BasicArrayTypeInfo); + Assert.assertEquals("basic", pti.getPojoFieldAt(1).field.getName()); + Assert.assertTrue(pti.getPojoFieldAt(1).type instanceof BasicTypeInfo); + Assert.assertEquals("hadoopCitizen", pti.getPojoFieldAt(2).field.getName()); + Assert.assertTrue(pti.getPojoFieldAt(2).type instanceof WritableTypeInfo); + Assert.assertEquals("tuple", pti.getPojoFieldAt(3).field.getName()); + Assert.assertTrue(pti.getPojoFieldAt(3).type instanceof TupleTypeInfo); + } + + @Test + public void testPojoType2() { + TypeInformation ti = TypeInfoParser.parse("Tuple2>>"); + Assert.assertTrue(ti instanceof TupleTypeInfo); + TupleTypeInfo tti = (TupleTypeInfo) ti; + Assert.assertTrue(tti.getTypeAt(0) instanceof BasicTypeInfo); + Assert.assertTrue(tti.getTypeAt(1) instanceof TupleTypeInfo); + TupleTypeInfo tti2 = (TupleTypeInfo) tti.getTypeAt(1); + Assert.assertTrue(tti2.getTypeAt(0) instanceof BasicTypeInfo); + Assert.assertTrue(tti2.getTypeAt(1) instanceof PojoTypeInfo); + PojoTypeInfo pti = (PojoTypeInfo) tti2.getTypeAt(1); + Assert.assertEquals("basic", pti.getPojoFieldAt(0).field.getName()); + Assert.assertTrue(pti.getPojoFieldAt(0).type instanceof BasicTypeInfo); + } + public static class MyWritable implements Writable { @Override @@ -198,6 +242,19 @@ public void testLargeMixedTuple() { Assert.assertEquals("ObjectArrayTypeInfo>, ValueType, Java Tuple1>>", ti.toString()); } + public static enum MyEnum { + ONE, TWO, THREE + } + + @Test + public void testEnumType() { + TypeInformation ti = TypeInfoParser.parse("Enum"); + Assert.assertEquals("EnumTypeInfo", ti.toString()); + + TypeInformation ti2 = TypeInfoParser.parse("java.lang.Enum"); + Assert.assertEquals("EnumTypeInfo", ti2.toString()); + } + @Test public void testException() { try { diff --git a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java index fa85f8c807142..49a3fe5553b00 100644 --- a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java +++ b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,6 +21,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; + import junit.framework.Assert; import org.apache.flink.api.common.functions.CoGroupFunction; @@ -39,6 +40,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.MissingTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.TypeInfoParser; @@ -54,23 +56,23 @@ public void testIdentifyLambdas() { @Override public Integer map(String value) { return Integer.parseInt(value); } }; - + MapFunction anonymousFromClass = new RichMapFunction() { @Override public Integer map(String value) { return Integer.parseInt(value); } }; - + MapFunction fromProperClass = new StaticMapper(); - + MapFunction fromDerived = new ToTuple() { @Override public Tuple2 map(Integer value) { return new Tuple2(value, 1L); } }; - + MapFunction lambda = (str) -> Integer.parseInt(str); - + assertNull(FunctionUtils.checkAndExtractLambdaMethod(anonymousFromInterface)); assertNull(FunctionUtils.checkAndExtractLambdaMethod(anonymousFromClass)); assertNull(FunctionUtils.checkAndExtractLambdaMethod(fromProperClass)); @@ -83,158 +85,168 @@ public Tuple2 map(Integer value) { fail(e.getMessage()); } } - - public static class StaticMapper implements MapFunction { + public static class StaticMapper implements MapFunction { @Override public Integer map(String value) { return Integer.parseInt(value); } } - - public interface ToTuple extends MapFunction> { + public interface ToTuple extends MapFunction> { @Override public Tuple2 map(T value) throws Exception; } - + private static final MapFunction STATIC_LAMBDA = (str) -> Integer.parseInt(str); - + public static class MyClass { private String s = "mystring"; - + public MapFunction getMapFunction() { return (i) -> s; } } - + @Test - public void testLambdaWithMemberVariable() { + public void testLambdaWithMemberVariable() { TypeInformation ti = TypeExtractor.getMapReturnTypes(new MyClass().getMapFunction(), TypeInfoParser.parse("Integer")); Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO); } - + @Test public void testLambdaWithLocalVariable() { String s = "mystring"; final int k = 24; int j = 26; - + MapFunction f = (i) -> s + k + j; - + TypeInformation ti = TypeExtractor.getMapReturnTypes(f, TypeInfoParser.parse("Integer")); Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO); } - + @Test public void testMapLambda() { MapFunction, Boolean>, Tuple2, String>> f = (i) -> null; - + TypeInformation ti = TypeExtractor.getMapReturnTypes(f, TypeInfoParser.parse("Tuple2, Boolean>")); - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + if (!(ti instanceof MissingTypeInfo)) { + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + Assert.assertTrue(((TupleTypeInfo) ti).getTypeAt(0).isTupleType()); + Assert.assertEquals(((TupleTypeInfo) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + } } - + @Test public void testFlatMapLambda() { FlatMapFunction, Boolean>, Tuple2, String>> f = (i, o) -> {}; - + TypeInformation ti = TypeExtractor.getFlatMapReturnTypes(f, TypeInfoParser.parse("Tuple2, Boolean>")); - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + if (!(ti instanceof MissingTypeInfo)) { + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + Assert.assertTrue(((TupleTypeInfo) ti).getTypeAt(0).isTupleType()); + Assert.assertEquals(((TupleTypeInfo) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + } } - + @Test public void testMapPartitionLambda() { MapPartitionFunction, Boolean>, Tuple2, String>> f = (i, o) -> {}; - + TypeInformation ti = TypeExtractor.getMapPartitionReturnTypes(f, TypeInfoParser.parse("Tuple2, Boolean>")); - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + if (!(ti instanceof MissingTypeInfo)) { + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + Assert.assertTrue(((TupleTypeInfo) ti).getTypeAt(0).isTupleType()); + Assert.assertEquals(((TupleTypeInfo) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + } } - + @Test public void testGroupReduceLambda() { GroupReduceFunction, Boolean>, Tuple2, String>> f = (i, o) -> {}; - + TypeInformation ti = TypeExtractor.getGroupReduceReturnTypes(f, TypeInfoParser.parse("Tuple2, Boolean>")); - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + if (!(ti instanceof MissingTypeInfo)) { + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + Assert.assertTrue(((TupleTypeInfo) ti).getTypeAt(0).isTupleType()); + Assert.assertEquals(((TupleTypeInfo) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + } } - + @Test public void testFlatJoinLambda() { FlatJoinFunction, Boolean>, Tuple2, Double>, Tuple2, String>> f = (i1, i2, o) -> {}; - + TypeInformation ti = TypeExtractor.getFlatJoinReturnTypes(f, TypeInfoParser.parse("Tuple2, Boolean>"), TypeInfoParser.parse("Tuple2, Double>")); - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + if (!(ti instanceof MissingTypeInfo)) { + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + Assert.assertTrue(((TupleTypeInfo) ti).getTypeAt(0).isTupleType()); + Assert.assertEquals(((TupleTypeInfo) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + } } - + @Test public void testJoinLambda() { JoinFunction, Boolean>, Tuple2, Double>, Tuple2, String>> f = (i1, i2) -> null; - + TypeInformation ti = TypeExtractor.getJoinReturnTypes(f, TypeInfoParser.parse("Tuple2, Boolean>"), TypeInfoParser.parse("Tuple2, Double>")); - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + if (!(ti instanceof MissingTypeInfo)) { + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + Assert.assertTrue(((TupleTypeInfo) ti).getTypeAt(0).isTupleType()); + Assert.assertEquals(((TupleTypeInfo) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + } } - + @Test public void testCoGroupLambda() { CoGroupFunction, Boolean>, Tuple2, Double>, Tuple2, String>> f = (i1, i2, o) -> {}; - + TypeInformation ti = TypeExtractor.getCoGroupReturnTypes(f, TypeInfoParser.parse("Tuple2, Boolean>"), TypeInfoParser.parse("Tuple2, Double>")); - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + if (!(ti instanceof MissingTypeInfo)) { + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + Assert.assertTrue(((TupleTypeInfo) ti).getTypeAt(0).isTupleType()); + Assert.assertEquals(((TupleTypeInfo) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + } } - + @Test public void testCrossLambda() { CrossFunction, Boolean>, Tuple2, Double>, Tuple2, String>> f = (i1, i2) -> null; - + TypeInformation ti = TypeExtractor.getCrossReturnTypes(f, TypeInfoParser.parse("Tuple2, Boolean>"), TypeInfoParser.parse("Tuple2, Double>")); - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + if (!(ti instanceof MissingTypeInfo)) { + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + Assert.assertTrue(((TupleTypeInfo) ti).getTypeAt(0).isTupleType()); + Assert.assertEquals(((TupleTypeInfo) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + } } - + @Test public void testKeySelectorLambda() { KeySelector, Boolean>, Tuple2, String>> f = (i) -> null; - + TypeInformation ti = TypeExtractor.getKeySelectorTypes(f, TypeInfoParser.parse("Tuple2, Boolean>")); - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + if (!(ti instanceof MissingTypeInfo)) { + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + Assert.assertTrue(((TupleTypeInfo) ti).getTypeAt(0).isTupleType()); + Assert.assertEquals(((TupleTypeInfo) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + } } - + @SuppressWarnings("rawtypes") @Test - public void testLambdaTypeErasureException() { + public void testLambdaTypeErasure() { MapFunction f = (i) -> null; - - try { - TypeExtractor.getMapReturnTypes(f, TypeInfoParser.parse("Tuple1")); - Assert.fail(); - } - catch (InvalidTypesException e) { - // ok - } + TypeInformation ti = TypeExtractor.getMapReturnTypes(f, TypeInfoParser.parse("Tuple1"), null, true); + Assert.assertTrue(ti instanceof MissingTypeInfo); } - + } diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java new file mode 100644 index 0000000000000..350227a965e1e --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.javaApiOperators; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedList; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.util.Collector; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class TypeHintITCase extends JavaProgramTestBase { + + private static int NUM_PROGRAMS = 3; + + private int curProgId = config.getInteger("ProgramId", -1); + private String resultPath; + private String expectedResult; + + public TypeHintITCase(Configuration config) { + super(config); + } + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + @Override + protected void testProgram() throws Exception { + expectedResult = TypeHintProgs.runProgram(curProgId, resultPath); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Parameters + public static Collection getConfigurations() throws FileNotFoundException, IOException { + + LinkedList tConfigs = new LinkedList(); + + for(int i=1; i <= NUM_PROGRAMS; i++) { + Configuration config = new Configuration(); + config.setInteger("ProgramId", i); + tConfigs.add(config); + } + + return toParameterList(tConfigs); + } + + private static class TypeHintProgs { + + public static String runProgram(int progId, String resultPath) throws Exception { + switch(progId) { + // Test identity map with missing types and string type hint + case 1: { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet> identityMapDs = ds + .map(new Mapper, Tuple3>()) + .returns("Tuple3"); + identityMapDs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "(2,2,Hello)\n" + + "(3,2,Hello world)\n" + + "(1,1,Hi)\n"; + } + // Test identity map with missing types and type information type hint + case 2: { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet> identityMapDs = ds + // all following generics get erased during compilation + .map(new Mapper, Tuple3>()) + .returns(new TupleTypeInfo>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)); + identityMapDs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "(2,2,Hello)\n" + + "(3,2,Hello world)\n" + + "(1,1,Hi)\n"; + } + // Test flat map with class type hint + case 3: { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> ds = CollectionDataSets.getSmall3TupleDataSet(env); + @SuppressWarnings({ "rawtypes", "unchecked" }) + DataSet identityMapDs = ds. + flatMap(new FlatMapper, Integer>()) + .returns((Class) Integer.class); + identityMapDs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "2\n" + + "3\n" + + "1\n"; + } + default: + throw new IllegalArgumentException("Invalid program id"); + } + } + } + + // -------------------------------------------------------------------------------------------- + + public static class Mapper implements MapFunction { + private static final long serialVersionUID = 1L; + + @SuppressWarnings("unchecked") + @Override + public V map(T value) throws Exception { + return (V) value; + } + } + + public static class FlatMapper implements FlatMapFunction { + private static final long serialVersionUID = 1L; + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public void flatMap(T value, Collector out) throws Exception { + out.collect((V) ((Tuple3)value).f0); + } + } + +} diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala index 9c255f3562148..ab90757915c94 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala @@ -57,6 +57,10 @@ class ScalaAPICompletenessTest { "org.apache.flink.api.java.ExecutionEnvironment.localExecutionIsAllowed", "org.apache.flink.api.java.ExecutionEnvironment.setDefaultLocalParallelism", + // TypeHints are only needed for Java API, Scala API doesn't need them + "org.apache.flink.api.java.operators.SingleInputUdfOperator.returns", + "org.apache.flink.api.java.operators.TwoInputUdfOperator.returns", + // This is really just a mapper, which in Scala can easily expressed as a map lambda "org.apache.flink.api.java.DataSet.writeAsFormattedText",